Source code for gkeepapi

""".. moduleauthor:: Kai <z@kwi.li>"""

__version__ = "0.16.0"

import datetime
import http
import logging
import random
import re
import time
from collections.abc import Callable, Iterator
from typing import IO, Any
from uuid import getnode as get_mac

import gpsoauth
import requests

from . import exception
from . import node as _node

logger = logging.getLogger(__name__)


[docs] class APIAuth: """Authentication token manager""" def __init__(self, scopes: str) -> None: """Construct API authentication manager""" self._master_token = None self._auth_token = None self._email = None self._device_id = None self._scopes = scopes
[docs] def login(self, email: str, password: str, device_id: str) -> None: """Authenticate to Google with the provided credentials. Args: email: The account to use. password: The account password. device_id: An identifier for this client. Raises: LoginException: If there was a problem logging in. """ self._email = email self._device_id = device_id # Obtain a master token. res = gpsoauth.perform_master_login(self._email, password, self._device_id) # Bail if browser login is required. if res.get("Error") == "NeedsBrowser": raise exception.BrowserLoginRequiredException(res.get("Url")) # Bail if no token was returned. if "Token" not in res: raise exception.LoginException(res.get("Error"), res.get("ErrorDetail")) self._master_token = res["Token"] # Obtain an OAuth token. self.refresh()
[docs] def load(self, email: str, master_token: str, device_id: str) -> bool: """Authenticate to Google with the provided master token. Args: email: The account to use. master_token: The master token. device_id: An identifier for this client. Raises: LoginException: If there was a problem logging in. """ self._email = email self._device_id = device_id self._master_token = master_token # Obtain an OAuth token. self.refresh() return True
[docs] def getMasterToken(self) -> str: """Gets the master token. Returns: The account master token. """ return self._master_token
[docs] def setMasterToken(self, master_token: str) -> None: """Sets the master token. This is useful if you'd like to authenticate with the API without providing your username & password. Do note that the master token has full access to your account. Args: master_token: The account master token. """ self._master_token = master_token
[docs] def getEmail(self) -> str: """Gets the account email. Returns: The account email. """ return self._email
[docs] def setEmail(self, email: str) -> None: """Sets the account email. Args: email: The account email. """ self._email = email
[docs] def getDeviceId(self) -> str: """Gets the device id. Returns: The device id. """ return self._device_id
[docs] def setDeviceId(self, device_id: str) -> None: """Sets the device id. Args: device_id: The device id. """ self._device_id = device_id
[docs] def getAuthToken(self) -> str | None: """Gets the auth token. Returns: The auth token. """ return self._auth_token
[docs] def refresh(self) -> str: """Refresh the OAuth token. Returns: The auth token. Raises: LoginException: If there was a problem refreshing the OAuth token. """ # Obtain an OAuth token with the necessary scopes by pretending to be # the keep android client. res = gpsoauth.perform_oauth( self._email, self._master_token, self._device_id, service=self._scopes, app="com.google.android.keep", client_sig="38918a453d07199354f8b19af05ec6562ced5788", ) # Bail if no token was returned. if "Auth" not in res and "Token" not in res: raise exception.LoginException(res.get("Error")) self._auth_token = res["Auth"] return self._auth_token
[docs] def logout(self) -> None: """Log out of the account.""" self._master_token = None self._auth_token = None self._email = None self._device_id = None
[docs] class API: """Base API wrapper""" RETRY_CNT = 2 def __init__(self, base_url: str, auth: APIAuth | None = None) -> None: """Construct a low-level API client""" self._session = requests.Session() self._auth = auth self._base_url = base_url self._session.headers.update( { "User-Agent": "x-gkeepapi/%s (https://github.com/kiwiz/gkeepapi)" % __version__ } )
[docs] def getAuth(self) -> APIAuth: """Get authentication details for this API. Return: auth: The auth object """ return self._auth
[docs] def setAuth(self, auth: APIAuth) -> None: """Set authentication details for this API. Args: auth: The auth object """ self._auth = auth
[docs] def send(self, **req_kwargs: dict) -> dict: """Send an authenticated request to a Google API. Automatically retries if the access token has expired. Args: **req_kwargs: Arbitrary keyword arguments to pass to Requests. Return: The parsed JSON response. Raises: APIException: If the server returns an error. LoginException: If session is not authenticated. """ # Send a request to the API servers, with retry handling. OAuth tokens # are valid for several hours (as of this comment). i = 0 while True: # Send off the request. If there was no error, we're good. response = self._send(**req_kwargs).json() if "error" not in response: break # Otherwise, check if it was a non-401 response code. These aren't # handled, so bail. error = response["error"] if error["code"] != http.HTTPStatus.UNAUTHORIZED: raise exception.APIException(error["code"], error) # If we've exceeded the retry limit, also bail. if i >= self.RETRY_CNT: raise exception.APIException(error["code"], error) # Otherwise, try requesting a new OAuth token. logger.info("Refreshing access token") self._auth.refresh() i += 1 return response
def _send(self, **req_kwargs: dict) -> requests.Response: """Send an authenticated request to a Google API. Args: **req_kwargs: Arbitrary keyword arguments to pass to Requests. Return: The raw response. Raises: LoginException: If session is not authenticated. """ # Bail if we don't have an OAuth token. auth_token = self._auth.getAuthToken() if auth_token is None: raise exception.LoginException("Not logged in") # Add the token to the request. req_kwargs.setdefault("headers", {"Authorization": "OAuth " + auth_token}) return self._session.request(**req_kwargs)
[docs] class KeepAPI(API): """Low level Google Keep API client. Mimics the Android Google Keep app. You probably want to use :py:class:`Keep` instead. """ API_URL = "https://www.googleapis.com/notes/v1/" def __init__(self, auth: APIAuth | None = None) -> None: """Construct a low-level Google Keep API client""" super().__init__(self.API_URL, auth) create_time = time.time() self._session_id = self._generateId(create_time) @classmethod def _generateId(cls, tz: int) -> str: return "s--%d--%d" % ( int(tz * 1000), random.randint(1000000000, 9999999999), # noqa: S311 )
[docs] def changes( self, target_version: str | None = None, nodes: list[dict] | None = None, labels: list[dict] | None = None, ) -> dict: """Sync up (and down) all changes. Args: target_version: The local change version. nodes: A list of nodes to sync up to the server. labels: A list of labels to sync up to the server. Return: Description of all changes. Raises: APIException: If the server returns an error. """ # Handle defaults. if nodes is None: nodes = [] if labels is None: labels = [] current_time = time.time() # Initialize request parameters. params = { "nodes": nodes, "clientTimestamp": _node.NodeTimestamps.int_to_str(current_time), "requestHeader": { "clientSessionId": self._session_id, "clientPlatform": "ANDROID", "clientVersion": { "major": "9", "minor": "9", "build": "9", "revision": "9", }, "capabilities": [ {"type": "NC"}, # Color support (Send note color) {"type": "PI"}, # Pinned support (Send note pinned) {"type": "LB"}, # Labels support (Send note labels) {"type": "AN"}, # Annotations support (Send annotations) {"type": "SH"}, # Sharing support {"type": "DR"}, # Drawing support {"type": "TR"}, # Trash support (Stop setting the delete timestamp) {"type": "IN"}, # Indentation support (Send listitem parent) {"type": "SNB"}, # Allows modification of shared notes? {"type": "MI"}, # Concise blob info? {"type": "CO"}, # VSS_SUCCEEDED when off? # TODO: Figure out what these do: # {'type': 'EC'}, # ??? # {'type': 'RB'}, # Rollback? # {'type': 'EX'}, # ??? ], }, } # Add the targetVersion if set. This tells the server what version the # client is currently at. if target_version is not None: params["targetVersion"] = target_version # Add any new or updated labels to the request. if labels: params["userInfo"] = {"labels": labels} logger.debug("Syncing %d labels and %d nodes", len(labels), len(nodes)) return self.send(url=self._base_url + "changes", method="POST", json=params)
[docs] class MediaAPI(API): """Low level Google Media API client. Mimics the Android Google Keep app. You probably want to use :py:class:`Keep` instead. """ API_URL = "https://keep.google.com/media/v2/" def __init__(self, auth: APIAuth | None = None) -> None: """Construct a low-level Google Media API client""" super().__init__(self.API_URL, auth)
[docs] def get(self, blob: _node.Blob) -> str: """Get the canonical link to a media blob. Args: blob: The blob. Returns: A link to the media. """ url = self._base_url + blob.parent.server_id + "/" + blob.server_id if blob.blob.type == _node.BlobType.Drawing: url += "/" + blob.blob._drawing_info.drawing_id # noqa: SLF001 return self._send(url=url, method="GET", allow_redirects=False).headers[ "location" ]
[docs] class RemindersAPI(API): """Low level Google Reminders API client. Mimics the Android Google Keep app. You probably want to use :py:class:`Keep` instead. """ API_URL = "https://www.googleapis.com/reminders/v1internal/reminders/" def __init__(self, auth: APIAuth | None = None) -> None: """Construct a low-level Google Reminders API client""" super().__init__(self.API_URL, auth) self.static_params = { "taskList": [ {"systemListId": "MEMENTO"}, ], "requestParameters": { "userAgentStructured": { "clientApplication": "KEEP", "clientApplicationVersion": { "major": "9", "minor": "9.9.9.9", }, "clientPlatform": "ANDROID", }, }, }
[docs] def create( self, node_id: str, node_server_id: str, dtime: datetime.datetime ) -> Any: # noqa: ANN401 """Create a new reminder. Args: node_id: The note ID. node_server_id: The note server ID. dtime: The due date of this reminder. Return: ??? Raises: APIException: If the server returns an error. """ params = {} params.update(self.static_params) params.update( { "task": { "dueDate": { "year": dtime.year, "month": dtime.month, "day": dtime.day, "time": { "hour": dtime.hour, "minute": dtime.minute, "second": dtime.second, }, }, "snoozed": True, "extensions": { "keepExtension": { "reminderVersion": "V2", "clientNoteId": node_id, "serverNoteId": node_server_id, }, }, }, "taskId": { "clientAssignedId": "KEEP/v2/" + node_server_id, }, } ) return self.send(url=self._base_url + "create", method="POST", json=params)
[docs] def update_internal( self, node_id: str, node_server_id: str, dtime: datetime.datetime ) -> Any: # noqa: ANN401 """Update an existing reminder. Args: node_id: The note ID. node_server_id: The note server ID. dtime: The due date of this reminder. Return: ??? Raises: APIException: If the server returns an error. """ params = {} params.update(self.static_params) params.update( { "newTask": { "dueDate": { "year": dtime.year, "month": dtime.month, "day": dtime.day, "time": { "hour": dtime.hour, "minute": dtime.minute, "second": dtime.second, }, }, "snoozed": True, "extensions": { "keepExtension": { "reminderVersion": "V2", "clientNoteId": node_id, "serverNoteId": node_server_id, }, }, }, "taskId": { "clientAssignedId": "KEEP/v2/" + node_server_id, }, "updateMask": { "updateField": [ "ARCHIVED", "DUE_DATE", "EXTENSIONS", "LOCATION", "TITLE", ] }, } ) return self.send(url=self._base_url + "update", method="POST", json=params)
[docs] def delete(self, node_server_id: str) -> Any: # noqa: ANN401 """Delete an existing reminder. Args: node_server_id: The note server ID. Return: ??? Raises: APIException: If the server returns an error. """ params = {} params.update(self.static_params) params.update( { "batchedRequest": [ { "deleteTask": { "taskId": [ {"clientAssignedId": "KEEP/v2/" + node_server_id} ] } } ] } ) return self.send(url=self._base_url + "batchmutate", method="POST", json=params)
[docs] def list(self, master: bool = True) -> Any: # noqa: ANN401 """List current reminders. Args: master: ??? Return: ??? Raises: APIException: If the server returns an error. """ params = {} params.update(self.static_params) if master: params.update( { "recurrenceOptions": { "collapseMode": "MASTER_ONLY", }, "includeArchived": True, "includeDeleted": False, } ) else: current_time = time.time() start_time = int((current_time - (365 * 24 * 60 * 60)) * 1000) end_time = int((current_time + (24 * 60 * 60)) * 1000) params.update( { "recurrenceOptions": { "collapseMode": "INSTANCES_ONLY", "recurrencesOnly": True, }, "includeArchived": False, "includeCompleted": False, "includeDeleted": False, "dueAfterMs": start_time, "dueBeforeMs": end_time, "recurrenceId": [], } ) return self.send(url=self._base_url + "list", method="POST", json=params)
[docs] def history(self, storage_version: str) -> Any: # noqa: ANN401 """Get reminder changes. Args: storage_version: The local storage version. Returns: ??? Raises: APIException: If the server returns an error. """ params = { "storageVersion": storage_version, "includeSnoozePresetUpdates": True, } params.update(self.static_params) return self.send(url=self._base_url + "history", method="POST", json=params)
[docs] def update(self) -> Any: # noqa: ANN401 """Sync up changes to reminders.""" params = {} return self.send(url=self._base_url + "update", method="POST", json=params)
[docs] class Keep: """High level Google Keep client. Manipulates a local copy of the Keep node tree. First, obtain a master token for your account. To start, first authenticate:: keep.authenticate('...', '...') Individual Notes can be retrieved by id:: some_note = keep.get('some_id') New Notes can be created:: new_note = keep.createNote() These Notes can then be modified:: some_note.text = 'Test' new_note.text = 'Text' These changes are automatically detected and synced up with:: keep.sync() """ OAUTH_SCOPES = "oauth2:https://www.googleapis.com/auth/memento https://www.googleapis.com/auth/reminders" def __init__(self) -> None: """Construct a Google Keep client""" self._keep_api = KeepAPI() self._reminders_api = RemindersAPI() self._media_api = MediaAPI() self._keep_version = None self._reminder_version = None self._labels = {} self._nodes = {} self._sid_map = {} self._clear() def _clear(self) -> None: self._keep_version = None self._reminder_version = None self._labels = {} self._nodes = {} self._sid_map = {} root_node = _node.Root() self._nodes[_node.Root.ID] = root_node
[docs] def login( self, email: str, password: str, state: dict | None = None, sync: bool = True, device_id: str | None = None, ) -> None: """Authenticate to Google with the provided credentials & sync. This flow is discouraged. Args: email: The account to use. password: The account password. state: Serialized state to load. sync: Whether to sync data. device_id: Device id. Raises: LoginException: If there was a problem logging in. """ logger.warning("'Keep.login' is deprecated. Please use 'Keep.authenticate' instead") auth = APIAuth(self.OAUTH_SCOPES) if device_id is None: device_id = f"{get_mac():x}" auth.login(email, password, device_id) self.load(auth, state, sync)
[docs] def resume( self, email: str, master_token: str, state: dict | None = None, sync: bool = True, device_id: str | None = None, ) -> None: logger.warning("'Keep.resume' has been renamed to 'Keep.authenticate'. Please update your code") self.authenticate(email, master_token, state, sync, device_id)
[docs] def authenticate( self, email: str, master_token: str, state: dict | None = None, sync: bool = True, device_id: str | None = None, ) -> None: """Authenticate to Google with the provided master token & sync. Args: email: The account to use. master_token: The master token. state: Serialized state to load. sync: Whether to sync data. device_id: Device id. Raises: LoginException: If there was a problem logging in. """ auth = APIAuth(self.OAUTH_SCOPES) if device_id is None: device_id = f"{get_mac():x}" auth.load(email, master_token, device_id) self.load(auth, state, sync)
[docs] def getMasterToken(self) -> str: """Get master token for resuming. Returns: The master token. """ return self._keep_api.getAuth().getMasterToken()
[docs] def load(self, auth: APIAuth, state: dict | None = None, sync: bool = True) -> None: """Authenticate to Google with a prepared authentication object & sync. Args: auth: Authentication object. state: Serialized state to load. sync: Whether to sync data. Raises: LoginException: If there was a problem logging in. """ self._keep_api.setAuth(auth) self._reminders_api.setAuth(auth) self._media_api.setAuth(auth) if state is not None: self.restore(state) if sync: self.sync()
[docs] def dump(self) -> dict: """Serialize note data. Returns: Serialized state. """ # Find all nodes manually, as the Keep object isn't aware of new # ListItems until they've been synced to the server. nodes = [] for node in self.all(): nodes.append(node) nodes.extend(node.children) return { "keep_version": self._keep_version, "labels": [label.save(False) for label in self.labels()], "nodes": [node.save(False) for node in nodes], }
[docs] def restore(self, state: dict) -> None: """Unserialize saved note data. Args: state: Serialized state to load. """ self._clear() self._parseUserInfo({"labels": state["labels"]}) self._parseNodes(state["nodes"]) self._keep_version = state["keep_version"]
[docs] def get(self, node_id: str) -> _node.TopLevelNode: """Get a note with the given ID. Args: node_id: The note ID. Returns: The Note or None if not found. """ return self._nodes[_node.Root.ID].get(node_id) or self._nodes[ _node.Root.ID ].get(self._sid_map.get(node_id))
[docs] def add(self, node: _node.Node) -> None: """Register a top level node (and its children) for syncing up to the server. There's no need to call this for nodes created by :py:meth:`createNote` or :py:meth:`createList` as they are automatically added. Args: node: The node to sync. Raises: InvalidException: If the parent node is not found. """ if node.parent_id != _node.Root.ID: raise exception.InvalidException("Not a top level node") self._nodes[node.id] = node self._nodes[node.parent_id].append(node, False)
[docs] def find( self, query: re.Pattern | str | None = None, func: Callable | None = None, labels: list[str] | None = None, colors: list[str] | None = None, pinned: bool | None = None, archived: bool | None = None, trashed: bool = False, ) -> Iterator[_node.TopLevelNode]: """Find Notes based on the specified criteria. Args: query: A str or regular expression to match against the title and text. func: A filter function. labels: A list of label ids or objects to match. An empty list matches notes with no labels. colors: A list of colors to match. pinned: Whether to match pinned notes. archived: Whether to match archived notes. trashed: Whether to match trashed notes. Return: Search results. """ if labels is not None: labels = [i.id if isinstance(i, _node.Label) else i for i in labels] return ( node for node in self.all() if # Process the query. ( query is None or ( ( isinstance(query, str) and (query in node.title or query in node.text) ) or ( isinstance(query, re.Pattern) and (query.search(node.title) or query.search(node.text)) ) ) ) and # Process the func. (func is None or func(node)) and ( # Process the labels. labels is None or (not labels and not node.labels.all()) or (any(node.labels.get(i) is not None for i in labels)) ) and (colors is None or node.color in colors) # Process the colors. and (pinned is None or node.pinned == pinned) # Process the pinned state. and ( # Process the archive state. archived is None or node.archived == archived ) and (trashed is None or node.trashed == trashed) # Process the trash state. )
[docs] def createNote( self, title: str | None = None, text: str | None = None ) -> _node.Node: """Create a new managed note. Any changes to the note will be uploaded when :py:meth:`sync` is called. Args: title: The title of the note. text: The text of the note. Returns: The new note. """ node = _node.Note() if title is not None: node.title = title if text is not None: node.text = text self.add(node) return node
[docs] def createList( self, title: str | None = None, items: list[tuple[str, bool]] | None = None, ) -> _node.List: """Create a new list and populate it. Any changes to the note will be uploaded when :py:meth:`sync` is called. Args: title: The title of the list. items: A list of tuples. Each tuple represents the text and checked status of the listitem. Returns: The new list. """ if items is None: items = [] node = _node.List() if title is not None: node.title = title sort = random.randint(1000000000, 9999999999) # noqa: S311 for text, checked in items: node.add(text, checked, sort) sort -= _node.List.SORT_DELTA self.add(node) return node
[docs] def createLabel(self, name: str) -> _node.Label: """Create a new label. Args: name: Label name. Returns: The new label. Raises: LabelException: If the label exists. """ if self.findLabel(name): raise exception.LabelException("Label exists") node = _node.Label() node.name = name self._labels[node.id] = node return node
[docs] def findLabel( self, query: re.Pattern | str, create: bool = False ) -> _node.Label | None: """Find a label with the given name. Args: query: A str or regular expression to match against the name. create: Whether to create the label if it doesn't exist (only if name is a str). Returns: The label. """ is_str = isinstance(query, str) name = None if is_str: name = query query = query.lower() for label in self._labels.values(): # Match the label against query, which may be a str or Pattern. if (is_str and query == label.name.lower()) or ( isinstance(query, re.Pattern) and query.search(label.name) ): return label return self.createLabel(name) if create and is_str else None
[docs] def getLabel(self, label_id: str) -> _node.Label | None: """Get an existing label. Args: label_id: Label id. Returns: The label. """ return self._labels.get(label_id)
[docs] def deleteLabel(self, label_id: str) -> None: """Deletes a label. Args: label_id: Label id. """ if label_id not in self._labels: return label = self._labels[label_id] label.delete() for node in self.all(): node.labels.remove(label)
[docs] def labels(self) -> list[_node.Label]: """Get all labels. Returns: Labels """ return list(self._labels.values())
def __UNSTABLE_API_uploadMedia(self, fh: IO)-> None: pass
[docs] def all(self) -> list[_node.TopLevelNode]: """Get all Notes. Returns: All notes. """ return self._nodes[_node.Root.ID].children
[docs] def sync(self, resync: bool = False) -> None: """Sync the local Keep tree with the server. If resyncing, local changes will be destroyed. Otherwise, local changes to notes, labels and reminders will be detected and synced up. Args: resync: Whether to resync data. Raises: SyncException: If there is a consistency issue. """ # Clear all state if we want to resync. if resync: self._clear() # self._sync_reminders() self._sync_notes() if _node.DEBUG: self._clean()
def _sync_reminders(self) -> None: # Fetch updates until we reach the newest version. while True: logger.debug("Starting reminder sync: %s", self._reminder_version) changes = self._reminders_api.list() # Hydrate the individual "tasks". if "task" in changes: self._parseTasks(changes["task"]) self._reminder_version = changes["storageVersion"] logger.debug("Finishing sync: %s", self._reminder_version) # Check if we've reached the newest version. history = self._reminders_api.history(self._reminder_version) if self._reminder_version == history["highestStorageVersion"]: break def _sync_notes(self) -> None: # Fetch updates until we reach the newest version. while True: logger.debug("Starting keep sync: %s", self._keep_version) # Collect any changes and send them up to the server. labels_updated = any(i.dirty for i in self._labels.values()) changes = self._keep_api.changes( target_version=self._keep_version, nodes=[i.save() for i in self._findDirtyNodes()], labels=[i.save() for i in self._labels.values()] if labels_updated else None, ) if changes.get("forceFullResync"): raise exception.ResyncRequiredException("Full resync required") if changes.get("upgradeRecommended"): raise exception.UpgradeRecommendedException("Upgrade recommended") # Hydrate labels. if "userInfo" in changes: self._parseUserInfo(changes["userInfo"]) # Hydrate notes and any children. if "nodes" in changes: self._parseNodes(changes["nodes"]) self._keep_version = changes["toVersion"] logger.debug("Finishing sync: %s", self._keep_version) # Check if there are more changes to retrieve. if not changes["truncated"]: break def _parseTasks(self, raw: dict) -> None: pass def _parseNodes(self, raw: dict) -> None: # noqa: C901, PLR0912 created_nodes = [] deleted_nodes = [] listitem_nodes = [] # Loop over each updated node. for raw_node in raw: # If the id exists, then we already know about it. In other words, # update a local node. if raw_node["id"] in self._nodes: node = self._nodes[raw_node["id"]] if "parentId" in raw_node: # If the parentId field is set, this is an update. Load it # into the existing node. node.load(raw_node) self._sid_map[node.server_id] = node.id logger.debug("Updated node: %s", raw_node["id"]) else: # Otherwise, this node has been deleted. Add it to the list. deleted_nodes.append(node) else: # Otherwise, this is a new node. Attempt to hydrate it. node = _node.from_json(raw_node) if node is None: logger.debug("Discarded unknown node") else: # Append the new node into the node tree. self._nodes[raw_node["id"]] = node self._sid_map[node.server_id] = node.id created_nodes.append(node) logger.debug("Created node: %s", raw_node["id"]) # If the node is a listitem, keep track of it. if isinstance(node, _node.ListItem): listitem_nodes.append(node) # Attach each listitem to its parent list. Indented items point to their # parent listitem, so we need to traverse up until we reach the list. for node in listitem_nodes: prev = node.prev_super_list_item_id curr = node.super_list_item_id if prev == curr: continue # Apply proper indentation. if prev is not None and prev in self._nodes: self._nodes[prev].dedent(node, False) if curr is not None and curr in self._nodes: self._nodes[curr].indent(node, False) # Attach created nodes to the tree. for node in created_nodes: logger.debug( "Attached node: %s to %s", node.id if node else None, node.parent_id if node else None, ) parent_node = self._nodes.get(node.parent_id) parent_node.append(node, False) # Detach deleted nodes from the tree. for node in deleted_nodes: node.parent.remove(node) del self._nodes[node.id] if node.server_id is not None: del self._sid_map[node.server_id] logger.debug("Deleted node: %s", node.id) # Hydrate label references in notes. for node in self.all(): for label_id in node.labels._labels: # noqa: SLF001 node.labels._labels[label_id] = self._labels.get( # noqa: SLF001 label_id ) def _parseUserInfo(self, raw: dict) -> None: labels = {} if "labels" in raw: for label in raw["labels"]: # If the mainId field exists, this is an update. if label["mainId"] in self._labels: node = self._labels[label["mainId"]] # Remove this key from our list of labels. del self._labels[label["mainId"]] logger.debug("Updated label: %s", label["mainId"]) else: # Otherwise, this is a brand new label. node = _node.Label() logger.debug("Created label: %s", label["mainId"]) node.load(label) labels[label["mainId"]] = node # All remaining labels are deleted. for label_id in self._labels: logger.debug("Deleted label: %s", label_id) self._labels = labels def _findDirtyNodes(self) -> list[_node.Node]: # Find nodes that aren't in our internal nodes list and insert them. for node in list(self._nodes.values()): for child in node.children: if child.id not in self._nodes: self._nodes[child.id] = child # Collect all dirty nodes (any nodes from above will be caught too). return [node for node in self._nodes.values() if node.dirty] def _clean(self) -> None: """Recursively check that all nodes are reachable.""" found_ids = set() nodes = [self._nodes[_node.Root.ID]] # Enumerate all nodes from the root node while nodes: node = nodes.pop() found_ids.add(node.id) nodes = nodes + node.children # Find nodes that can't be reached from the root for node_id in self._nodes: if node_id in found_ids: continue logger.error("Dangling node: %s", node_id) # Find nodes that don't exist in the collection for node_id in found_ids: if node_id in self._nodes: continue logger.error("Unregistered node: %s", node_id)