Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="monday-api-python-sdk", # Required
version="1.6.0", # Required
version="1.6.5", # Required
description="A Python SDK for interacting with Monday's GraphQL API", # Optional
long_description=long_description, # Optional
long_description_content_type="text/markdown", # Optional (see note above)
Expand Down
124 changes: 122 additions & 2 deletions src/monday_sdk/modules/boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

from ..graphql_handler import MondayGraphQL
from ..query_templates import get_boards_query, get_board_by_id_query, get_board_items_query, get_columns_by_board_query
from ..types import MondayApiResponse, Item, ItemsPage, BoardKind, BoardState, BoardsOrderBy
from ..types import MondayApiResponse, Item, BoardKind, BoardState, BoardsOrderBy, Operator, ItemsOrderByDirection
from ..utils import sleep_according_to_complexity, construct_updated_at_query_params
from ..constants import DEFAULT_PAGE_LIMIT_BOARDS, DEFAULT_PAGE_LIMIT_ITEMS


def is_cursor_expired_error(error: Exception) -> bool:
"""Check if the exception is a CursorExpiredError."""
return "CursorExpiredError" in str(error)


class BoardModule:
def __init__(self, graphql_client: MondayGraphQL):
self.client = graphql_client

def fetch_boards(
self,
limit: Optional[int] = DEFAULT_PAGE_LIMIT_BOARDS,
Expand All @@ -31,11 +37,23 @@ def fetch_all_items_by_board_id(
board_id: Union[int, str],
query_params: Optional[Mapping[str, Any]] = None,
limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS,
is_large_board: bool = False,
) -> List[Item]:
"""
Fetches all items from a board by board ID, includes paginating
todo: add support for multiple board IDs
"""
if is_large_board:
return self._fetch_all_items_large_board(board_id, query_params, limit)
return self._fetch_all_items(board_id, query_params, limit)

def _fetch_all_items(
self,
board_id: Union[int, str],
query_params: Optional[Mapping[str, Any]] = None,
limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS,
) -> List[Item]:
"""Internal method for standard item fetching with pagination."""
items: List[Item] = []
cursor = None

Expand All @@ -55,12 +73,114 @@ def fetch_all_items_by_board_id(

return items

def _fetch_all_items_large_board(
self,
board_id: Union[int, str],
query_params: Optional[Mapping[str, Any]] = None,
limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS,
) -> List[Item]:
"""
Fetches all items from a board by board ID, with cursor expiration handling.
Uses updated_at tracking to recover from CursorExpiredError and continue fetching.
Suitable for large boards where cursor might expire during pagination.

When a cursor expires, the function rebuilds the query using the last known
updated_at timestamp to continue from where it left off.

Note: Custom order_by in query_params is not supported. This function always
orders by __last_updated__ ascending to ensure correct cursor recovery.
"""
items: List[Item] = []
cursor = None
last_updated_at: Optional[str] = None
last_updated_at_to_use = None
page = 0
while True:
# Build query params with updated_at filter if recovering from cursor expiration
print(f"Fetching page {page} with cursor: {cursor} and last_updated_at: {last_updated_at_to_use}")
effective_query_params = self._merge_query_params_with_updated_at(query_params, last_updated_at_to_use)
last_updated_at_to_use = None
try:
query = get_board_items_query(board_id, query_params=effective_query_params, cursor=cursor, limit=limit)
response = self.client.execute(query)
items_page = response.data.boards[0].items_page if cursor is None else response.data.next_items_page

# Track the last updated_at for potential recovery
if items_page.items:
last_updated_at = items_page.items[-1].updated_at

items.extend(items_page.items)
complexity = response.data.complexity.query
cursor = items_page.cursor
if not cursor:
break
sleep_according_to_complexity(complexity)
page += 1

except Exception as e:
if is_cursor_expired_error(e):
print(f"Cursor expired - resetting cursor and using updated_at filter to continue")
cursor = None
last_updated_at_to_use = last_updated_at
continue
raise e

return items

def _merge_query_params_with_updated_at(
self,
query_params: Optional[Mapping[str, Any]],
updated_after: Optional[str],
) -> Mapping[str, Any]:
"""
Merges existing query_params with an updated_at filter and order_by clause.
Always includes order_by to sort by updated_at ascending.
If updated_after is provided, adds a filter to fetch items updated after that timestamp.

Note: If updated_after is provided, any existing rules with GREATER_THAN_OR_EQUALS
on __last_updated__ will be removed to avoid conflicting filters.
"""
merged_params = dict(query_params) if query_params else {}
merged_params["order_by"] = [{"column_id": "__last_updated__", "direction": ItemsOrderByDirection.ASC}]

if updated_after is not None:
# Remove existing greater-than rules on __last_updated__ to avoid conflicts
existing_rules = list(merged_params.get("rules", []))
filtered_rules = [
rule for rule in existing_rules
if not self._is_last_updated_greater_than_rule(rule)
]

updated_at_rule = {
"column_id": "__last_updated__",
"compare_value": ["EXACT", updated_after],
"operator": Operator.GREATER_THAN_OR_EQUALS,
"compare_attribute": "UPDATED_AT",
}
filtered_rules.append(updated_at_rule)
merged_params["rules"] = filtered_rules

return merged_params

def _is_last_updated_greater_than_rule(self, rule: Mapping[str, Any]) -> bool:
"""
Check if a rule is a greater-than or greater-than-or-equals filter on __last_updated__.
"""
if rule.get("column_id") != "__last_updated__":
return False
operator = rule.get("operator")
# Handle both enum and string values
if isinstance(operator, Operator):
return operator == Operator.GREATER_THAN_OR_EQUALS
return operator == Operator.GREATER_THAN_OR_EQUALS.value

def fetch_item_by_board_id_by_update_date(
self,
board_id: Union[int, str],
updated_after: str,
updated_before: str,
limit: Optional[int] = DEFAULT_PAGE_LIMIT_ITEMS,
is_large_board: bool = False,
) -> List[Item]:
"""
Fetches items from a board by board ID by update date, useful for incremental fetching
Expand All @@ -71,7 +191,7 @@ def fetch_item_by_board_id_by_update_date(
"Either updated_after or updated_before must be provided when fetching items by update date"
)
query_params = construct_updated_at_query_params(updated_after, updated_before)
return self.fetch_all_items_by_board_id(board_id, query_params=query_params, limit=limit)
return self.fetch_all_items_by_board_id(board_id, query_params=query_params, limit=limit, is_large_board=is_large_board)

def fetch_columns_by_board_id(self, board_id: Union[int, str]) -> MondayApiResponse:
query = get_columns_by_board_query(board_id)
Expand Down
2 changes: 1 addition & 1 deletion src/monday_sdk/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
DocumentBlock,
Document,
)
from .monday_enums import BoardKind, BoardState, BoardsOrderBy, Operator
from .monday_enums import BoardKind, BoardState, BoardsOrderBy, Operator, ItemsOrderByDirection
from .file_input import FileInput
7 changes: 7 additions & 0 deletions src/monday_sdk/types/monday_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ class BoardsOrderBy(Enum):

CREATED_AT = "created_at"
USED_AT = "used_at"


class ItemsOrderByDirection(Enum):
"""Direction for ordering items"""

ASC = "asc"
DESC = "desc"
Loading