""".. moduleauthor:: Kai <z@kwi.li>"""
__version__ = "0.16.0"
import datetime
import http
import logging
import random
import re
import time
from collections.abc import Callable, Iterator
from typing import IO, Any
from uuid import getnode as get_mac
import gpsoauth
import requests
from . import exception
from . import node as _node
logger = logging.getLogger(__name__)
[docs]
class APIAuth:
"""Authentication token manager"""
def __init__(self, scopes: str) -> None:
"""Construct API authentication manager"""
self._master_token = None
self._auth_token = None
self._email = None
self._device_id = None
self._scopes = scopes
[docs]
def login(self, email: str, password: str, device_id: str) -> None:
"""Authenticate to Google with the provided credentials.
Args:
email: The account to use.
password: The account password.
device_id: An identifier for this client.
Raises:
LoginException: If there was a problem logging in.
"""
self._email = email
self._device_id = device_id
# Obtain a master token.
res = gpsoauth.perform_master_login(self._email, password, self._device_id)
# Bail if browser login is required.
if res.get("Error") == "NeedsBrowser":
raise exception.BrowserLoginRequiredException(res.get("Url"))
# Bail if no token was returned.
if "Token" not in res:
raise exception.LoginException(res.get("Error"), res.get("ErrorDetail"))
self._master_token = res["Token"]
# Obtain an OAuth token.
self.refresh()
[docs]
def load(self, email: str, master_token: str, device_id: str) -> bool:
"""Authenticate to Google with the provided master token.
Args:
email: The account to use.
master_token: The master token.
device_id: An identifier for this client.
Raises:
LoginException: If there was a problem logging in.
"""
self._email = email
self._device_id = device_id
self._master_token = master_token
# Obtain an OAuth token.
self.refresh()
return True
[docs]
def getMasterToken(self) -> str:
"""Gets the master token.
Returns:
The account master token.
"""
return self._master_token
[docs]
def setMasterToken(self, master_token: str) -> None:
"""Sets the master token. This is useful if you'd like to authenticate with the API without providing your username & password. Do note that the master token has full access to your account.
Args:
master_token: The account master token.
"""
self._master_token = master_token
[docs]
def getEmail(self) -> str:
"""Gets the account email.
Returns:
The account email.
"""
return self._email
[docs]
def setEmail(self, email: str) -> None:
"""Sets the account email.
Args:
email: The account email.
"""
self._email = email
[docs]
def getDeviceId(self) -> str:
"""Gets the device id.
Returns:
The device id.
"""
return self._device_id
[docs]
def setDeviceId(self, device_id: str) -> None:
"""Sets the device id.
Args:
device_id: The device id.
"""
self._device_id = device_id
[docs]
def getAuthToken(self) -> str | None:
"""Gets the auth token.
Returns:
The auth token.
"""
return self._auth_token
[docs]
def refresh(self) -> str:
"""Refresh the OAuth token.
Returns:
The auth token.
Raises:
LoginException: If there was a problem refreshing the OAuth token.
"""
# Obtain an OAuth token with the necessary scopes by pretending to be
# the keep android client.
res = gpsoauth.perform_oauth(
self._email,
self._master_token,
self._device_id,
service=self._scopes,
app="com.google.android.keep",
client_sig="38918a453d07199354f8b19af05ec6562ced5788",
)
# Bail if no token was returned.
if "Auth" not in res and "Token" not in res:
raise exception.LoginException(res.get("Error"))
self._auth_token = res["Auth"]
return self._auth_token
[docs]
def logout(self) -> None:
"""Log out of the account."""
self._master_token = None
self._auth_token = None
self._email = None
self._device_id = None
[docs]
class API:
"""Base API wrapper"""
RETRY_CNT = 2
def __init__(self, base_url: str, auth: APIAuth | None = None) -> None:
"""Construct a low-level API client"""
self._session = requests.Session()
self._auth = auth
self._base_url = base_url
self._session.headers.update(
{
"User-Agent": "x-gkeepapi/%s (https://github.com/kiwiz/gkeepapi)"
% __version__
}
)
[docs]
def getAuth(self) -> APIAuth:
"""Get authentication details for this API.
Return:
auth: The auth object
"""
return self._auth
[docs]
def setAuth(self, auth: APIAuth) -> None:
"""Set authentication details for this API.
Args:
auth: The auth object
"""
self._auth = auth
[docs]
def send(self, **req_kwargs: dict) -> dict:
"""Send an authenticated request to a Google API. Automatically retries if the access token has expired.
Args:
**req_kwargs: Arbitrary keyword arguments to pass to Requests.
Return:
The parsed JSON response.
Raises:
APIException: If the server returns an error.
LoginException: If session is not authenticated.
"""
# Send a request to the API servers, with retry handling. OAuth tokens
# are valid for several hours (as of this comment).
i = 0
while True:
# Send off the request. If there was no error, we're good.
response = self._send(**req_kwargs).json()
if "error" not in response:
break
# Otherwise, check if it was a non-401 response code. These aren't
# handled, so bail.
error = response["error"]
if error["code"] != http.HTTPStatus.UNAUTHORIZED:
raise exception.APIException(error["code"], error)
# If we've exceeded the retry limit, also bail.
if i >= self.RETRY_CNT:
raise exception.APIException(error["code"], error)
# Otherwise, try requesting a new OAuth token.
logger.info("Refreshing access token")
self._auth.refresh()
i += 1
return response
def _send(self, **req_kwargs: dict) -> requests.Response:
"""Send an authenticated request to a Google API.
Args:
**req_kwargs: Arbitrary keyword arguments to pass to Requests.
Return:
The raw response.
Raises:
LoginException: If session is not authenticated.
"""
# Bail if we don't have an OAuth token.
auth_token = self._auth.getAuthToken()
if auth_token is None:
raise exception.LoginException("Not logged in")
# Add the token to the request.
req_kwargs.setdefault("headers", {"Authorization": "OAuth " + auth_token})
return self._session.request(**req_kwargs)
[docs]
class KeepAPI(API):
"""Low level Google Keep API client. Mimics the Android Google Keep app.
You probably want to use :py:class:`Keep` instead.
"""
API_URL = "https://www.googleapis.com/notes/v1/"
def __init__(self, auth: APIAuth | None = None) -> None:
"""Construct a low-level Google Keep API client"""
super().__init__(self.API_URL, auth)
create_time = time.time()
self._session_id = self._generateId(create_time)
@classmethod
def _generateId(cls, tz: int) -> str:
return "s--%d--%d" % (
int(tz * 1000),
random.randint(1000000000, 9999999999), # noqa: S311
)
[docs]
def changes(
self,
target_version: str | None = None,
nodes: list[dict] | None = None,
labels: list[dict] | None = None,
) -> dict:
"""Sync up (and down) all changes.
Args:
target_version: The local change version.
nodes: A list of nodes to sync up to the server.
labels: A list of labels to sync up to the server.
Return:
Description of all changes.
Raises:
APIException: If the server returns an error.
"""
# Handle defaults.
if nodes is None:
nodes = []
if labels is None:
labels = []
current_time = time.time()
# Initialize request parameters.
params = {
"nodes": nodes,
"clientTimestamp": _node.NodeTimestamps.int_to_str(current_time),
"requestHeader": {
"clientSessionId": self._session_id,
"clientPlatform": "ANDROID",
"clientVersion": {
"major": "9",
"minor": "9",
"build": "9",
"revision": "9",
},
"capabilities": [
{"type": "NC"}, # Color support (Send note color)
{"type": "PI"}, # Pinned support (Send note pinned)
{"type": "LB"}, # Labels support (Send note labels)
{"type": "AN"}, # Annotations support (Send annotations)
{"type": "SH"}, # Sharing support
{"type": "DR"}, # Drawing support
{"type": "TR"}, # Trash support (Stop setting the delete timestamp)
{"type": "IN"}, # Indentation support (Send listitem parent)
{"type": "SNB"}, # Allows modification of shared notes?
{"type": "MI"}, # Concise blob info?
{"type": "CO"}, # VSS_SUCCEEDED when off?
# TODO: Figure out what these do:
# {'type': 'EC'}, # ???
# {'type': 'RB'}, # Rollback?
# {'type': 'EX'}, # ???
],
},
}
# Add the targetVersion if set. This tells the server what version the
# client is currently at.
if target_version is not None:
params["targetVersion"] = target_version
# Add any new or updated labels to the request.
if labels:
params["userInfo"] = {"labels": labels}
logger.debug("Syncing %d labels and %d nodes", len(labels), len(nodes))
return self.send(url=self._base_url + "changes", method="POST", json=params)
[docs]
class RemindersAPI(API):
"""Low level Google Reminders API client. Mimics the Android Google Keep app.
You probably want to use :py:class:`Keep` instead.
"""
API_URL = "https://www.googleapis.com/reminders/v1internal/reminders/"
def __init__(self, auth: APIAuth | None = None) -> None:
"""Construct a low-level Google Reminders API client"""
super().__init__(self.API_URL, auth)
self.static_params = {
"taskList": [
{"systemListId": "MEMENTO"},
],
"requestParameters": {
"userAgentStructured": {
"clientApplication": "KEEP",
"clientApplicationVersion": {
"major": "9",
"minor": "9.9.9.9",
},
"clientPlatform": "ANDROID",
},
},
}
[docs]
def create(
self, node_id: str, node_server_id: str, dtime: datetime.datetime
) -> Any: # noqa: ANN401
"""Create a new reminder.
Args:
node_id: The note ID.
node_server_id: The note server ID.
dtime: The due date of this reminder.
Return: ???
Raises:
APIException: If the server returns an error.
"""
params = {}
params.update(self.static_params)
params.update(
{
"task": {
"dueDate": {
"year": dtime.year,
"month": dtime.month,
"day": dtime.day,
"time": {
"hour": dtime.hour,
"minute": dtime.minute,
"second": dtime.second,
},
},
"snoozed": True,
"extensions": {
"keepExtension": {
"reminderVersion": "V2",
"clientNoteId": node_id,
"serverNoteId": node_server_id,
},
},
},
"taskId": {
"clientAssignedId": "KEEP/v2/" + node_server_id,
},
}
)
return self.send(url=self._base_url + "create", method="POST", json=params)
[docs]
def update_internal(
self, node_id: str, node_server_id: str, dtime: datetime.datetime
) -> Any: # noqa: ANN401
"""Update an existing reminder.
Args:
node_id: The note ID.
node_server_id: The note server ID.
dtime: The due date of this reminder.
Return: ???
Raises:
APIException: If the server returns an error.
"""
params = {}
params.update(self.static_params)
params.update(
{
"newTask": {
"dueDate": {
"year": dtime.year,
"month": dtime.month,
"day": dtime.day,
"time": {
"hour": dtime.hour,
"minute": dtime.minute,
"second": dtime.second,
},
},
"snoozed": True,
"extensions": {
"keepExtension": {
"reminderVersion": "V2",
"clientNoteId": node_id,
"serverNoteId": node_server_id,
},
},
},
"taskId": {
"clientAssignedId": "KEEP/v2/" + node_server_id,
},
"updateMask": {
"updateField": [
"ARCHIVED",
"DUE_DATE",
"EXTENSIONS",
"LOCATION",
"TITLE",
]
},
}
)
return self.send(url=self._base_url + "update", method="POST", json=params)
[docs]
def delete(self, node_server_id: str) -> Any: # noqa: ANN401
"""Delete an existing reminder.
Args:
node_server_id: The note server ID.
Return: ???
Raises:
APIException: If the server returns an error.
"""
params = {}
params.update(self.static_params)
params.update(
{
"batchedRequest": [
{
"deleteTask": {
"taskId": [
{"clientAssignedId": "KEEP/v2/" + node_server_id}
]
}
}
]
}
)
return self.send(url=self._base_url + "batchmutate", method="POST", json=params)
[docs]
def list(self, master: bool = True) -> Any: # noqa: ANN401
"""List current reminders.
Args:
master: ???
Return:
???
Raises:
APIException: If the server returns an error.
"""
params = {}
params.update(self.static_params)
if master:
params.update(
{
"recurrenceOptions": {
"collapseMode": "MASTER_ONLY",
},
"includeArchived": True,
"includeDeleted": False,
}
)
else:
current_time = time.time()
start_time = int((current_time - (365 * 24 * 60 * 60)) * 1000)
end_time = int((current_time + (24 * 60 * 60)) * 1000)
params.update(
{
"recurrenceOptions": {
"collapseMode": "INSTANCES_ONLY",
"recurrencesOnly": True,
},
"includeArchived": False,
"includeCompleted": False,
"includeDeleted": False,
"dueAfterMs": start_time,
"dueBeforeMs": end_time,
"recurrenceId": [],
}
)
return self.send(url=self._base_url + "list", method="POST", json=params)
[docs]
def history(self, storage_version: str) -> Any: # noqa: ANN401
"""Get reminder changes.
Args:
storage_version: The local storage version.
Returns:
???
Raises:
APIException: If the server returns an error.
"""
params = {
"storageVersion": storage_version,
"includeSnoozePresetUpdates": True,
}
params.update(self.static_params)
return self.send(url=self._base_url + "history", method="POST", json=params)
[docs]
def update(self) -> Any: # noqa: ANN401
"""Sync up changes to reminders."""
params = {}
return self.send(url=self._base_url + "update", method="POST", json=params)
[docs]
class Keep:
"""High level Google Keep client.
Manipulates a local copy of the Keep node tree. First, obtain a master token for your account.
To start, first authenticate::
keep.authenticate('...', '...')
Individual Notes can be retrieved by id::
some_note = keep.get('some_id')
New Notes can be created::
new_note = keep.createNote()
These Notes can then be modified::
some_note.text = 'Test'
new_note.text = 'Text'
These changes are automatically detected and synced up with::
keep.sync()
"""
OAUTH_SCOPES = "oauth2:https://www.googleapis.com/auth/memento https://www.googleapis.com/auth/reminders"
def __init__(self) -> None:
"""Construct a Google Keep client"""
self._keep_api = KeepAPI()
self._reminders_api = RemindersAPI()
self._media_api = MediaAPI()
self._keep_version = None
self._reminder_version = None
self._labels = {}
self._nodes = {}
self._sid_map = {}
self._clear()
def _clear(self) -> None:
self._keep_version = None
self._reminder_version = None
self._labels = {}
self._nodes = {}
self._sid_map = {}
root_node = _node.Root()
self._nodes[_node.Root.ID] = root_node
[docs]
def login(
self,
email: str,
password: str,
state: dict | None = None,
sync: bool = True,
device_id: str | None = None,
) -> None:
"""Authenticate to Google with the provided credentials & sync.
This flow is discouraged.
Args:
email: The account to use.
password: The account password.
state: Serialized state to load.
sync: Whether to sync data.
device_id: Device id.
Raises:
LoginException: If there was a problem logging in.
"""
logger.warning("'Keep.login' is deprecated. Please use 'Keep.authenticate' instead")
auth = APIAuth(self.OAUTH_SCOPES)
if device_id is None:
device_id = f"{get_mac():x}"
auth.login(email, password, device_id)
self.load(auth, state, sync)
[docs]
def resume(
self,
email: str,
master_token: str,
state: dict | None = None,
sync: bool = True,
device_id: str | None = None,
) -> None:
logger.warning("'Keep.resume' has been renamed to 'Keep.authenticate'. Please update your code")
self.authenticate(email, master_token, state, sync, device_id)
[docs]
def authenticate(
self,
email: str,
master_token: str,
state: dict | None = None,
sync: bool = True,
device_id: str | None = None,
) -> None:
"""Authenticate to Google with the provided master token & sync.
Args:
email: The account to use.
master_token: The master token.
state: Serialized state to load.
sync: Whether to sync data.
device_id: Device id.
Raises:
LoginException: If there was a problem logging in.
"""
auth = APIAuth(self.OAUTH_SCOPES)
if device_id is None:
device_id = f"{get_mac():x}"
auth.load(email, master_token, device_id)
self.load(auth, state, sync)
[docs]
def getMasterToken(self) -> str:
"""Get master token for resuming.
Returns:
The master token.
"""
return self._keep_api.getAuth().getMasterToken()
[docs]
def load(self, auth: APIAuth, state: dict | None = None, sync: bool = True) -> None:
"""Authenticate to Google with a prepared authentication object & sync.
Args:
auth: Authentication object.
state: Serialized state to load.
sync: Whether to sync data.
Raises:
LoginException: If there was a problem logging in.
"""
self._keep_api.setAuth(auth)
self._reminders_api.setAuth(auth)
self._media_api.setAuth(auth)
if state is not None:
self.restore(state)
if sync:
self.sync()
[docs]
def dump(self) -> dict:
"""Serialize note data.
Returns:
Serialized state.
"""
# Find all nodes manually, as the Keep object isn't aware of new
# ListItems until they've been synced to the server.
nodes = []
for node in self.all():
nodes.append(node)
nodes.extend(node.children)
return {
"keep_version": self._keep_version,
"labels": [label.save(False) for label in self.labels()],
"nodes": [node.save(False) for node in nodes],
}
[docs]
def restore(self, state: dict) -> None:
"""Unserialize saved note data.
Args:
state: Serialized state to load.
"""
self._clear()
self._parseUserInfo({"labels": state["labels"]})
self._parseNodes(state["nodes"])
self._keep_version = state["keep_version"]
[docs]
def get(self, node_id: str) -> _node.TopLevelNode:
"""Get a note with the given ID.
Args:
node_id: The note ID.
Returns:
The Note or None if not found.
"""
return self._nodes[_node.Root.ID].get(node_id) or self._nodes[
_node.Root.ID
].get(self._sid_map.get(node_id))
[docs]
def add(self, node: _node.Node) -> None:
"""Register a top level node (and its children) for syncing up to the server. There's no need to call this for nodes created by :py:meth:`createNote` or :py:meth:`createList` as they are automatically added.
Args:
node: The node to sync.
Raises:
InvalidException: If the parent node is not found.
"""
if node.parent_id != _node.Root.ID:
raise exception.InvalidException("Not a top level node")
self._nodes[node.id] = node
self._nodes[node.parent_id].append(node, False)
[docs]
def find(
self,
query: re.Pattern | str | None = None,
func: Callable | None = None,
labels: list[str] | None = None,
colors: list[str] | None = None,
pinned: bool | None = None,
archived: bool | None = None,
trashed: bool = False,
) -> Iterator[_node.TopLevelNode]:
"""Find Notes based on the specified criteria.
Args:
query: A str or regular expression to match against the title and text.
func: A filter function.
labels: A list of label ids or objects to match. An empty list matches notes with no labels.
colors: A list of colors to match.
pinned: Whether to match pinned notes.
archived: Whether to match archived notes.
trashed: Whether to match trashed notes.
Return:
Search results.
"""
if labels is not None:
labels = [i.id if isinstance(i, _node.Label) else i for i in labels]
return (
node
for node in self.all()
if
# Process the query.
(
query is None
or (
(
isinstance(query, str)
and (query in node.title or query in node.text)
)
or (
isinstance(query, re.Pattern)
and (query.search(node.title) or query.search(node.text))
)
)
)
and
# Process the func.
(func is None or func(node))
and ( # Process the labels.
labels is None
or (not labels and not node.labels.all())
or (any(node.labels.get(i) is not None for i in labels))
)
and (colors is None or node.color in colors) # Process the colors.
and (pinned is None or node.pinned == pinned) # Process the pinned state.
and ( # Process the archive state.
archived is None or node.archived == archived
)
and (trashed is None or node.trashed == trashed) # Process the trash state.
)
[docs]
def createNote(
self, title: str | None = None, text: str | None = None
) -> _node.Node:
"""Create a new managed note. Any changes to the note will be uploaded when :py:meth:`sync` is called.
Args:
title: The title of the note.
text: The text of the note.
Returns:
The new note.
"""
node = _node.Note()
if title is not None:
node.title = title
if text is not None:
node.text = text
self.add(node)
return node
[docs]
def createList(
self,
title: str | None = None,
items: list[tuple[str, bool]] | None = None,
) -> _node.List:
"""Create a new list and populate it. Any changes to the note will be uploaded when :py:meth:`sync` is called.
Args:
title: The title of the list.
items: A list of tuples. Each tuple represents the text and checked status of the listitem.
Returns:
The new list.
"""
if items is None:
items = []
node = _node.List()
if title is not None:
node.title = title
sort = random.randint(1000000000, 9999999999) # noqa: S311
for text, checked in items:
node.add(text, checked, sort)
sort -= _node.List.SORT_DELTA
self.add(node)
return node
[docs]
def createLabel(self, name: str) -> _node.Label:
"""Create a new label.
Args:
name: Label name.
Returns:
The new label.
Raises:
LabelException: If the label exists.
"""
if self.findLabel(name):
raise exception.LabelException("Label exists")
node = _node.Label()
node.name = name
self._labels[node.id] = node
return node
[docs]
def findLabel(
self, query: re.Pattern | str, create: bool = False
) -> _node.Label | None:
"""Find a label with the given name.
Args:
query: A str or regular expression to match against the name.
create: Whether to create the label if it doesn't exist (only if name is a str).
Returns:
The label.
"""
is_str = isinstance(query, str)
name = None
if is_str:
name = query
query = query.lower()
for label in self._labels.values():
# Match the label against query, which may be a str or Pattern.
if (is_str and query == label.name.lower()) or (
isinstance(query, re.Pattern) and query.search(label.name)
):
return label
return self.createLabel(name) if create and is_str else None
[docs]
def getLabel(self, label_id: str) -> _node.Label | None:
"""Get an existing label.
Args:
label_id: Label id.
Returns:
The label.
"""
return self._labels.get(label_id)
[docs]
def deleteLabel(self, label_id: str) -> None:
"""Deletes a label.
Args:
label_id: Label id.
"""
if label_id not in self._labels:
return
label = self._labels[label_id]
label.delete()
for node in self.all():
node.labels.remove(label)
[docs]
def labels(self) -> list[_node.Label]:
"""Get all labels.
Returns:
Labels
"""
return list(self._labels.values())
def __UNSTABLE_API_uploadMedia(self, fh: IO)-> None:
pass
[docs]
def all(self) -> list[_node.TopLevelNode]:
"""Get all Notes.
Returns:
All notes.
"""
return self._nodes[_node.Root.ID].children
[docs]
def sync(self, resync: bool = False) -> None:
"""Sync the local Keep tree with the server. If resyncing, local changes will be destroyed. Otherwise, local changes to notes, labels and reminders will be detected and synced up.
Args:
resync: Whether to resync data.
Raises:
SyncException: If there is a consistency issue.
"""
# Clear all state if we want to resync.
if resync:
self._clear()
# self._sync_reminders()
self._sync_notes()
if _node.DEBUG:
self._clean()
def _sync_reminders(self) -> None:
# Fetch updates until we reach the newest version.
while True:
logger.debug("Starting reminder sync: %s", self._reminder_version)
changes = self._reminders_api.list()
# Hydrate the individual "tasks".
if "task" in changes:
self._parseTasks(changes["task"])
self._reminder_version = changes["storageVersion"]
logger.debug("Finishing sync: %s", self._reminder_version)
# Check if we've reached the newest version.
history = self._reminders_api.history(self._reminder_version)
if self._reminder_version == history["highestStorageVersion"]:
break
def _sync_notes(self) -> None:
# Fetch updates until we reach the newest version.
while True:
logger.debug("Starting keep sync: %s", self._keep_version)
# Collect any changes and send them up to the server.
labels_updated = any(i.dirty for i in self._labels.values())
changes = self._keep_api.changes(
target_version=self._keep_version,
nodes=[i.save() for i in self._findDirtyNodes()],
labels=[i.save() for i in self._labels.values()]
if labels_updated
else None,
)
if changes.get("forceFullResync"):
raise exception.ResyncRequiredException("Full resync required")
if changes.get("upgradeRecommended"):
raise exception.UpgradeRecommendedException("Upgrade recommended")
# Hydrate labels.
if "userInfo" in changes:
self._parseUserInfo(changes["userInfo"])
# Hydrate notes and any children.
if "nodes" in changes:
self._parseNodes(changes["nodes"])
self._keep_version = changes["toVersion"]
logger.debug("Finishing sync: %s", self._keep_version)
# Check if there are more changes to retrieve.
if not changes["truncated"]:
break
def _parseTasks(self, raw: dict) -> None:
pass
def _parseNodes(self, raw: dict) -> None: # noqa: C901, PLR0912
created_nodes = []
deleted_nodes = []
listitem_nodes = []
# Loop over each updated node.
for raw_node in raw:
# If the id exists, then we already know about it. In other words,
# update a local node.
if raw_node["id"] in self._nodes:
node = self._nodes[raw_node["id"]]
if "parentId" in raw_node:
# If the parentId field is set, this is an update. Load it
# into the existing node.
node.load(raw_node)
self._sid_map[node.server_id] = node.id
logger.debug("Updated node: %s", raw_node["id"])
else:
# Otherwise, this node has been deleted. Add it to the list.
deleted_nodes.append(node)
else:
# Otherwise, this is a new node. Attempt to hydrate it.
node = _node.from_json(raw_node)
if node is None:
logger.debug("Discarded unknown node")
else:
# Append the new node into the node tree.
self._nodes[raw_node["id"]] = node
self._sid_map[node.server_id] = node.id
created_nodes.append(node)
logger.debug("Created node: %s", raw_node["id"])
# If the node is a listitem, keep track of it.
if isinstance(node, _node.ListItem):
listitem_nodes.append(node)
# Attach each listitem to its parent list. Indented items point to their
# parent listitem, so we need to traverse up until we reach the list.
for node in listitem_nodes:
prev = node.prev_super_list_item_id
curr = node.super_list_item_id
if prev == curr:
continue
# Apply proper indentation.
if prev is not None and prev in self._nodes:
self._nodes[prev].dedent(node, False)
if curr is not None and curr in self._nodes:
self._nodes[curr].indent(node, False)
# Attach created nodes to the tree.
for node in created_nodes:
logger.debug(
"Attached node: %s to %s",
node.id if node else None,
node.parent_id if node else None,
)
parent_node = self._nodes.get(node.parent_id)
parent_node.append(node, False)
# Detach deleted nodes from the tree.
for node in deleted_nodes:
node.parent.remove(node)
del self._nodes[node.id]
if node.server_id is not None:
del self._sid_map[node.server_id]
logger.debug("Deleted node: %s", node.id)
# Hydrate label references in notes.
for node in self.all():
for label_id in node.labels._labels: # noqa: SLF001
node.labels._labels[label_id] = self._labels.get( # noqa: SLF001
label_id
)
def _parseUserInfo(self, raw: dict) -> None:
labels = {}
if "labels" in raw:
for label in raw["labels"]:
# If the mainId field exists, this is an update.
if label["mainId"] in self._labels:
node = self._labels[label["mainId"]]
# Remove this key from our list of labels.
del self._labels[label["mainId"]]
logger.debug("Updated label: %s", label["mainId"])
else:
# Otherwise, this is a brand new label.
node = _node.Label()
logger.debug("Created label: %s", label["mainId"])
node.load(label)
labels[label["mainId"]] = node
# All remaining labels are deleted.
for label_id in self._labels:
logger.debug("Deleted label: %s", label_id)
self._labels = labels
def _findDirtyNodes(self) -> list[_node.Node]:
# Find nodes that aren't in our internal nodes list and insert them.
for node in list(self._nodes.values()):
for child in node.children:
if child.id not in self._nodes:
self._nodes[child.id] = child
# Collect all dirty nodes (any nodes from above will be caught too).
return [node for node in self._nodes.values() if node.dirty]
def _clean(self) -> None:
"""Recursively check that all nodes are reachable."""
found_ids = set()
nodes = [self._nodes[_node.Root.ID]]
# Enumerate all nodes from the root node
while nodes:
node = nodes.pop()
found_ids.add(node.id)
nodes = nodes + node.children
# Find nodes that can't be reached from the root
for node_id in self._nodes:
if node_id in found_ids:
continue
logger.error("Dangling node: %s", node_id)
# Find nodes that don't exist in the collection
for node_id in found_ids:
if node_id in self._nodes:
continue
logger.error("Unregistered node: %s", node_id)