diff --git a/setup.py b/setup.py index d4d79cc..756ec54 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ setup( name="monday-api-python-sdk", # Required - version="1.6.0", # Required + version="1.6.5", # Required description="A Python SDK for interacting with Monday's GraphQL API", # Optional long_description=long_description, # Optional long_description_content_type="text/markdown", # Optional (see note above) diff --git a/src/monday_sdk/modules/boards.py b/src/monday_sdk/modules/boards.py index f01650b..080e28f 100644 --- a/src/monday_sdk/modules/boards.py +++ b/src/monday_sdk/modules/boards.py @@ -2,14 +2,20 @@ from ..graphql_handler import MondayGraphQL from ..query_templates import get_boards_query, get_board_by_id_query, get_board_items_query, get_columns_by_board_query -from ..types import MondayApiResponse, Item, ItemsPage, BoardKind, BoardState, BoardsOrderBy +from ..types import MondayApiResponse, Item, BoardKind, BoardState, BoardsOrderBy, Operator, ItemsOrderByDirection from ..utils import sleep_according_to_complexity, construct_updated_at_query_params from ..constants import DEFAULT_PAGE_LIMIT_BOARDS, DEFAULT_PAGE_LIMIT_ITEMS +def is_cursor_expired_error(error: Exception) -> bool: + """Check if the exception is a CursorExpiredError.""" + return "CursorExpiredError" in str(error) + + class BoardModule: def __init__(self, graphql_client: MondayGraphQL): self.client = graphql_client + def fetch_boards( self, limit: Optional[int] = DEFAULT_PAGE_LIMIT_BOARDS, @@ -31,11 +37,23 @@ def fetch_all_items_by_board_id( board_id: Union[int, str], query_params: Optional[Mapping[str, Any]] = None, limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS, + is_large_board: bool = False, ) -> List[Item]: """ Fetches all items from a board by board ID, includes paginating todo: add support for multiple board IDs """ + if is_large_board: + return self._fetch_all_items_large_board(board_id, query_params, limit) + return self._fetch_all_items(board_id, query_params, limit) + + def _fetch_all_items( + self, + board_id: Union[int, str], + query_params: Optional[Mapping[str, Any]] = None, + limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS, + ) -> List[Item]: + """Internal method for standard item fetching with pagination.""" items: List[Item] = [] cursor = None @@ -55,12 +73,114 @@ def fetch_all_items_by_board_id( return items + def _fetch_all_items_large_board( + self, + board_id: Union[int, str], + query_params: Optional[Mapping[str, Any]] = None, + limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS, + ) -> List[Item]: + """ + Fetches all items from a board by board ID, with cursor expiration handling. + Uses updated_at tracking to recover from CursorExpiredError and continue fetching. + Suitable for large boards where cursor might expire during pagination. + + When a cursor expires, the function rebuilds the query using the last known + updated_at timestamp to continue from where it left off. + + Note: Custom order_by in query_params is not supported. This function always + orders by __last_updated__ ascending to ensure correct cursor recovery. + """ + items: List[Item] = [] + cursor = None + last_updated_at: Optional[str] = None + last_updated_at_to_use = None + page = 0 + while True: + # Build query params with updated_at filter if recovering from cursor expiration + print(f"Fetching page {page} with cursor: {cursor} and last_updated_at: {last_updated_at_to_use}") + effective_query_params = self._merge_query_params_with_updated_at(query_params, last_updated_at_to_use) + last_updated_at_to_use = None + try: + query = get_board_items_query(board_id, query_params=effective_query_params, cursor=cursor, limit=limit) + response = self.client.execute(query) + items_page = response.data.boards[0].items_page if cursor is None else response.data.next_items_page + + # Track the last updated_at for potential recovery + if items_page.items: + last_updated_at = items_page.items[-1].updated_at + + items.extend(items_page.items) + complexity = response.data.complexity.query + cursor = items_page.cursor + if not cursor: + break + sleep_according_to_complexity(complexity) + page += 1 + + except Exception as e: + if is_cursor_expired_error(e): + print(f"Cursor expired - resetting cursor and using updated_at filter to continue") + cursor = None + last_updated_at_to_use = last_updated_at + continue + raise e + + return items + + def _merge_query_params_with_updated_at( + self, + query_params: Optional[Mapping[str, Any]], + updated_after: Optional[str], + ) -> Mapping[str, Any]: + """ + Merges existing query_params with an updated_at filter and order_by clause. + Always includes order_by to sort by updated_at ascending. + If updated_after is provided, adds a filter to fetch items updated after that timestamp. + + Note: If updated_after is provided, any existing rules with GREATER_THAN_OR_EQUALS + on __last_updated__ will be removed to avoid conflicting filters. + """ + merged_params = dict(query_params) if query_params else {} + merged_params["order_by"] = [{"column_id": "__last_updated__", "direction": ItemsOrderByDirection.ASC}] + + if updated_after is not None: + # Remove existing greater-than rules on __last_updated__ to avoid conflicts + existing_rules = list(merged_params.get("rules", [])) + filtered_rules = [ + rule for rule in existing_rules + if not self._is_last_updated_greater_than_rule(rule) + ] + + updated_at_rule = { + "column_id": "__last_updated__", + "compare_value": ["EXACT", updated_after], + "operator": Operator.GREATER_THAN_OR_EQUALS, + "compare_attribute": "UPDATED_AT", + } + filtered_rules.append(updated_at_rule) + merged_params["rules"] = filtered_rules + + return merged_params + + def _is_last_updated_greater_than_rule(self, rule: Mapping[str, Any]) -> bool: + """ + Check if a rule is a greater-than or greater-than-or-equals filter on __last_updated__. + """ + if rule.get("column_id") != "__last_updated__": + return False + operator = rule.get("operator") + # Handle both enum and string values + if isinstance(operator, Operator): + return operator == Operator.GREATER_THAN_OR_EQUALS + return operator == Operator.GREATER_THAN_OR_EQUALS.value + def fetch_item_by_board_id_by_update_date( self, board_id: Union[int, str], updated_after: str, updated_before: str, limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS, + is_large_board: bool = False, ) -> List[Item]: """ Fetches items from a board by board ID by update date, useful for incremental fetching @@ -71,7 +191,7 @@ def fetch_item_by_board_id_by_update_date( "Either updated_after or updated_before must be provided when fetching items by update date" ) query_params = construct_updated_at_query_params(updated_after, updated_before) - return self.fetch_all_items_by_board_id(board_id, query_params=query_params, limit=limit) + return self.fetch_all_items_by_board_id(board_id, query_params=query_params, limit=limit, is_large_board=is_large_board) def fetch_columns_by_board_id(self, board_id: Union[int, str]) -> MondayApiResponse: query = get_columns_by_board_query(board_id) diff --git a/src/monday_sdk/types/__init__.py b/src/monday_sdk/types/__init__.py index 39470f4..dd55782 100644 --- a/src/monday_sdk/types/__init__.py +++ b/src/monday_sdk/types/__init__.py @@ -15,5 +15,5 @@ DocumentBlock, Document, ) -from .monday_enums import BoardKind, BoardState, BoardsOrderBy, Operator +from .monday_enums import BoardKind, BoardState, BoardsOrderBy, Operator, ItemsOrderByDirection from .file_input import FileInput diff --git a/src/monday_sdk/types/monday_enums.py b/src/monday_sdk/types/monday_enums.py index e2d1cf2..a717c1c 100644 --- a/src/monday_sdk/types/monday_enums.py +++ b/src/monday_sdk/types/monday_enums.py @@ -61,3 +61,10 @@ class BoardsOrderBy(Enum): CREATED_AT = "created_at" USED_AT = "used_at" + + +class ItemsOrderByDirection(Enum): + """Direction for ordering items""" + + ASC = "asc" + DESC = "desc"