From 5fdfb50d184ef9bab2915417b8ea180a443aa569 Mon Sep 17 00:00:00 2001 From: Anatoly Koptev Date: Fri, 6 Feb 2026 17:03:03 -0800 Subject: [PATCH] fix: PolarDB compatibility with Apache AGE 1.5+ - Add explicit type casting (properties::text::agtype) - Fix agtype_access_operator compatibility - Add debug logging for initialization - Add migration test script Fixes compatibility with Apache AGE 1.5+ strict type checking. Applied to 82 SQL query locations. Tested with: - Apache AGE 1.5.0+ - PostgreSQL 15.x - PolarDB PostgreSQL-compatible Co-Authored-By: Claude Opus 4.6 --- scripts/migrations/pr1-migration-script.sql | 75 +++++++++ src/memos/graph_dbs/polardb.py | 165 ++++++++++---------- 2 files changed, 158 insertions(+), 82 deletions(-) create mode 100644 scripts/migrations/pr1-migration-script.sql diff --git a/scripts/migrations/pr1-migration-script.sql b/scripts/migrations/pr1-migration-script.sql new file mode 100644 index 000000000..af6ad96a5 --- /dev/null +++ b/scripts/migrations/pr1-migration-script.sql @@ -0,0 +1,75 @@ +-- ============================================================================= +-- PolarDB Apache AGE Compatibility Check Script +-- ============================================================================= +-- Run this script to verify your Apache AGE version and test compatibility +-- +-- Usage: +-- psql -U postgres -d your_database -f pr1-migration-script.sql +-- ============================================================================= + +\echo '======================================================================' +\echo 'PolarDB Apache AGE Compatibility Check' +\echo '======================================================================' +\echo '' + +-- Check Apache AGE version +\echo 'Checking Apache AGE version...' +SELECT extversion AS age_version +FROM pg_extension +WHERE extname = 'age'; + +\echo '' +\echo 'Checking PostgreSQL version...' +SELECT version(); + +\echo '' +\echo '======================================================================' +\echo 'Testing agtype_access_operator compatibility' +\echo '======================================================================' + +-- Create temporary test table +DROP TABLE IF EXISTS temp_age_test; +CREATE TEMP TABLE temp_age_test ( + id TEXT PRIMARY KEY, + properties JSONB +); + +-- Insert test data +INSERT INTO temp_age_test (id, properties) VALUES + ('test_1', '{"memory_type": "memo", "user_name": "alice", "content": "Hello"}'), + ('test_2', '{"memory_type": "episode", "user_name": "bob", "content": "World"}'); + +\echo '' +\echo 'Test 1: OLD syntax (will FAIL on AGE 1.5+)' +\echo 'Query: agtype_access_operator(properties, ...)' + +SELECT COUNT(*) AS test1_result FROM ( + SELECT * + FROM temp_age_test + WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = '"memo"'::agtype +) AS subquery; + +\echo '' +\echo 'Test 2: NEW syntax (will WORK on all AGE versions)' +\echo 'Query: agtype_access_operator(properties::text::agtype, ...)' + +SELECT COUNT(*) AS test2_result FROM ( + SELECT * + FROM temp_age_test + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"memory_type"'::agtype) = '"memo"'::agtype +) AS subquery; + +\echo '' +\echo '======================================================================' +\echo 'Compatibility Test Results' +\echo '======================================================================' +\echo 'If Test 1 failed with type error: YOU NEED THIS PR' +\echo 'If Test 2 succeeded: This PR will fix your issues' +\echo '' +\echo 'Expected results:' +\echo ' AGE 1.4.x: Both tests succeed' +\echo ' AGE 1.5.0+: Test 1 fails, Test 2 succeeds' +\echo '======================================================================' + +-- Cleanup +DROP TABLE IF EXISTS temp_age_test; diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index b9c8ca84b..591caae92 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -126,6 +126,7 @@ def __init__(self, config: PolarDBGraphDBConfig): import psycopg2 import psycopg2.pool + print(f"DEBUG: PolarDBGraph init. Host={config.host}, DB={config.db_name}") self.config = config # Handle both dict and object config @@ -540,9 +541,9 @@ def get_memory_count(self, memory_type: str, user_name: str | None = None) -> in query = f""" SELECT COUNT(*) FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"memory_type"'::agtype) = %s::agtype """ - query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += "\nAND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" params = [self.format_param_value(memory_type), self.format_param_value(user_name)] # Get a connection from the pool @@ -566,9 +567,9 @@ def node_not_exist(self, scope: str, user_name: str | None = None) -> int: query = f""" SELECT id FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"memory_type"'::agtype) = %s::agtype """ - query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += "\nAND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" query += "\nLIMIT 1" params = [self.format_param_value(scope), self.format_param_value(user_name)] @@ -604,9 +605,9 @@ def remove_oldest_memory( # First find IDs to delete, then delete them select_query = f""" SELECT id FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype - AND ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = %s::agtype - ORDER BY ag_catalog.agtype_access_operator(properties, '"updated_at"'::agtype) DESC + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"memory_type"'::agtype) = %s::agtype + AND ag_catalog.agtype_access_operator(properties::text::agtype, '"user_name"'::agtype) = %s::agtype + ORDER BY ag_catalog.agtype_access_operator(properties::text::agtype, '"updated_at"'::agtype) DESC OFFSET %s """ select_params = [ @@ -688,7 +689,7 @@ def update_node(self, id: str, fields: dict[str, Any], user_name: str | None = N query = f""" UPDATE "{self.db_name}_graph"."Memory" SET properties = %s, embedding = %s - WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) = %s::agtype """ params = [ json.dumps(properties), @@ -699,13 +700,13 @@ def update_node(self, id: str, fields: dict[str, Any], user_name: str | None = N query = f""" UPDATE "{self.db_name}_graph"."Memory" SET properties = %s - WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) = %s::agtype """ params = [json.dumps(properties), self.format_param_value(id)] # Only add user filter when user_name is provided if user_name is not None: - query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += "\nAND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" params.append(self.format_param_value(user_name)) # Get a connection from the pool @@ -730,13 +731,13 @@ def delete_node(self, id: str, user_name: str | None = None) -> None: """ query = f""" DELETE FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) = %s::agtype """ params = [self.format_param_value(id)] # Only add user filter when user_name is provided if user_name is not None: - query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += "\nAND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" params.append(self.format_param_value(user_name)) # Get a connection from the pool @@ -1050,13 +1051,13 @@ def get_node( query = f""" SELECT {select_fields} FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) = %s::agtype """ params = [self.format_param_value(id)] # Only add user filter when user_name is provided if user_name is not None: - query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += "\nAND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" params.append(self.format_param_value(user_name)) logger.info(f"polardb [get_node] query: {query},params: {params}") @@ -1142,12 +1143,12 @@ def get_nodes( query = f""" SELECT id, properties, embedding FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '\"id\"'::agtype) = ANY(ARRAY[{placeholders}]::agtype[]) + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '\"id\"'::agtype) = ANY(ARRAY[{placeholders}]::agtype[]) """ # Only add user_name filter if provided if user_name is not None: - query += " AND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + query += " AND ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" params.append(self.format_param_value(user_name)) logger.info(f"get_nodes query:{query},params:{params}") @@ -1706,15 +1707,15 @@ def seach_by_keywords_like( if scope: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" ) if status: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"{status}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"{status}\"'::agtype" ) else: where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"activated\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"activated\"'::agtype" ) # Build user_name filter with knowledgebase_ids support (OR relationship) using common method @@ -1736,11 +1737,11 @@ def seach_by_keywords_like( for key, value in search_filter.items(): if isinstance(value, str): where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" ) else: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {value}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {value}::agtype" ) # Build filter conditions using common method @@ -1753,7 +1754,7 @@ def seach_by_keywords_like( query = f""" SELECT - ag_catalog.agtype_access_operator(properties, '"id"'::agtype) AS old_id, + ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) AS old_id, agtype_object_field_text(properties, 'memory') as memory_text FROM "{self.db_name}_graph"."Memory" {where_clause} @@ -1799,15 +1800,15 @@ def seach_by_keywords_tfidf( if scope: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" ) if status: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"{status}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"{status}\"'::agtype" ) else: where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"activated\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"activated\"'::agtype" ) # Build user_name filter with knowledgebase_ids support (OR relationship) using common method @@ -1829,11 +1830,11 @@ def seach_by_keywords_tfidf( for key, value in search_filter.items(): if isinstance(value, str): where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" ) else: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {value}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {value}::agtype" ) # Build filter conditions using common method @@ -1850,7 +1851,7 @@ def seach_by_keywords_tfidf( # Build fulltext search query query = f""" SELECT - ag_catalog.agtype_access_operator(properties, '"id"'::agtype) AS old_id, + ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) AS old_id, agtype_object_field_text(properties, 'memory') as memory_text FROM "{self.db_name}_graph"."Memory" {where_clause} @@ -1924,15 +1925,15 @@ def search_by_fulltext( if scope: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" ) if status: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"{status}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"{status}\"'::agtype" ) else: where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"activated\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"activated\"'::agtype" ) # Build user_name filter with knowledgebase_ids support (OR relationship) using common method @@ -1955,11 +1956,11 @@ def search_by_fulltext( for key, value in search_filter.items(): if isinstance(value, str): where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" ) else: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {value}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {value}::agtype" ) # Build filter conditions using common method @@ -1980,7 +1981,7 @@ def search_by_fulltext( # Build fulltext search query query = f""" SELECT - ag_catalog.agtype_access_operator(properties, '"id"'::agtype) AS old_id, + ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) AS old_id, agtype_object_field_text(properties, 'memory') as memory_text, ts_rank({tsvector_field}, to_tsquery('{tsquery_config}', %s)) as rank FROM "{self.db_name}_graph"."Memory" @@ -2040,15 +2041,15 @@ def search_by_embedding( where_clauses = [] if scope: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"memory_type\"'::agtype) = '\"{scope}\"'::agtype" ) if status: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"{status}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"{status}\"'::agtype" ) else: where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"activated\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"activated\"'::agtype" ) where_clauses.append("embedding is not null") # Add user_name filter like nebular.py @@ -2057,9 +2058,9 @@ def search_by_embedding( # user_name = self._get_config_value("user_name") # if not self.config.use_multi_db and user_name: # if kwargs.get("cube_name"): - # where_clauses.append(f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{kwargs['cube_name']}\"'::agtype") + # where_clauses.append(f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{kwargs['cube_name']}\"'::agtype") # else: - # where_clauses.append(f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype") + # where_clauses.append(f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype") """ # Build user_name filter with knowledgebase_ids support (OR relationship) using common method user_name_conditions = self._build_user_name_and_kb_ids_conditions_sql( @@ -2080,11 +2081,11 @@ def search_by_embedding( for key, value in search_filter.items(): if isinstance(value, str): where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{value}\"'::agtype" ) else: where_clauses.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {value}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {value}::agtype" ) # Build filter conditions using common method @@ -2100,7 +2101,7 @@ def search_by_embedding( SELECT id, properties, timeline, - ag_catalog.agtype_access_operator(properties, '"id"'::agtype) AS old_id, + ag_catalog.agtype_access_operator(properties::text::agtype, '"id"'::agtype) AS old_id, (1 - (embedding <=> %s::vector(1024))) AS scope FROM "{self.db_name}_graph"."Memory" {where_clause} @@ -2407,7 +2408,7 @@ def get_grouped_counts( user_name = user_name if user_name else self._get_config_value("user_name") # Build user clause - user_clause = f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype" + user_clause = f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype" if where_clause: where_clause = where_clause.strip() if where_clause.upper().startswith("WHERE"): @@ -2429,7 +2430,7 @@ def get_grouped_counts( if "user_name = %s" in where_clause: where_clause = where_clause.replace( "user_name = %s", - f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype", + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype", ) # Build return fields and group by fields @@ -2439,10 +2440,10 @@ def get_grouped_counts( for field in group_fields: alias = field.replace(".", "_") return_fields.append( - f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text AS {alias}" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{field}\"'::agtype)::text AS {alias}" ) group_by_fields.append( - f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{field}\"'::agtype)::text" ) # Full SQL query construction @@ -2589,11 +2590,11 @@ def export_graph( where_conditions = [] if user_name: where_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype" ) if user_id: where_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"user_id\"'::agtype) = '\"{user_id}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_id\"'::agtype) = '\"{user_id}\"'::agtype" ) # Build filter conditions using common method @@ -2628,7 +2629,7 @@ def export_graph( SELECT id, properties, embedding FROM "{self.db_name}_graph"."Memory" {where_clause} - ORDER BY ag_catalog.agtype_access_operator(properties, '"created_at"'::agtype) DESC NULLS LAST, + ORDER BY ag_catalog.agtype_access_operator(properties::text::agtype, '"created_at"'::agtype) DESC NULLS LAST, id DESC {pagination_clause} """ @@ -2637,7 +2638,7 @@ def export_graph( SELECT id, properties FROM "{self.db_name}_graph"."Memory" {where_clause} - ORDER BY ag_catalog.agtype_access_operator(properties, '"created_at"'::agtype) DESC NULLS LAST, + ORDER BY ag_catalog.agtype_access_operator(properties::text::agtype, '"created_at"'::agtype) DESC NULLS LAST, id DESC {pagination_clause} """ @@ -3827,30 +3828,30 @@ def get_neighbors_by_tag( exclude_conditions = [] for exclude_id in exclude_ids: exclude_conditions.append( - "ag_catalog.agtype_access_operator(properties, '\"id\"'::agtype) != %s::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"id\"'::agtype) != %s::agtype" ) params.append(self.format_param_value(exclude_id)) where_clauses.append(f"({' AND '.join(exclude_conditions)})") # Status filter - keep only 'activated' where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"status\"'::agtype) = '\"activated\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"status\"'::agtype) = '\"activated\"'::agtype" ) # Type filter - exclude 'reasoning' type where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"node_type\"'::agtype) != '\"reasoning\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"node_type\"'::agtype) != '\"reasoning\"'::agtype" ) # User filter where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = %s::agtype" ) params.append(self.format_param_value(user_name)) # Testing showed no data; annotate. where_clauses.append( - "ag_catalog.agtype_access_operator(properties, '\"memory_type\"'::agtype) != '\"WorkingMemory\"'::agtype" + "ag_catalog.agtype_access_operator(properties::text::agtype, '\"memory_type\"'::agtype) != '\"WorkingMemory\"'::agtype" ) where_clause = " AND ".join(where_clauses) @@ -4312,7 +4313,7 @@ def _build_user_name_and_kb_ids_conditions_sql( if effective_user_name: user_name_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{effective_user_name}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{effective_user_name}\"'::agtype" ) # Add knowledgebase_ids conditions (checking user_name field in the data) @@ -4320,7 +4321,7 @@ def _build_user_name_and_kb_ids_conditions_sql( for kb_id in knowledgebase_ids: if isinstance(kb_id, str): user_name_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{kb_id}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{kb_id}\"'::agtype" ) return user_name_conditions @@ -4768,17 +4769,17 @@ def build_filter_condition(condition_dict: dict) -> str: escaped_value = escape_sql_string(op_value) if is_datetime: condition_parts.append( - f"TRIM(BOTH '\"' FROM ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text)::timestamp {sql_op} '{escaped_value}'::timestamp" + f"TRIM(BOTH '\"' FROM ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype)::text)::timestamp {sql_op} '{escaped_value}'::timestamp" ) else: condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) {sql_op} '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) {sql_op} '\"{escaped_value}\"'::agtype" ) else: # For non-string values (numbers, booleans, etc.), convert to JSON string and then to agtype value_json = json.dumps(op_value) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) {sql_op} ag_catalog.agtype_in('{value_json}')" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) {sql_op} ag_catalog.agtype_in('{value_json}')" ) elif op == "=": # Handle equality operator @@ -4832,11 +4833,11 @@ def build_filter_condition(condition_dict: dict) -> str: # For scalar fields, use = if key in ("tags", "sources"): condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '[\"{escaped_value}\"]'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '[\"{escaped_value}\"]'::agtype" ) else: condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" ) elif isinstance(op_value, list): # For array fields, format list as JSON array string @@ -4846,24 +4847,24 @@ def build_filter_condition(condition_dict: dict) -> str: ] json_array = json.dumps(escaped_items) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '{json_array}'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '{json_array}'::agtype" ) else: # For non-string list values, convert to JSON string and then to agtype value_json = json.dumps(op_value) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" ) else: if key in ("tags", "sources"): condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '[{op_value}]'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '[{op_value}]'::agtype" ) else: # For non-string values (numbers, booleans, etc.), convert to JSON string and then to agtype value_json = json.dumps(op_value) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" ) elif op == "contains": # Handle contains operator @@ -4884,7 +4885,7 @@ def build_filter_condition(condition_dict: dict) -> str: escaped_value = escape_sql_string(str(op_value)) # For array fields, use @> with array format condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" ) elif op == "in": # Handle in operator (for checking if field value is in a list) @@ -4965,18 +4966,18 @@ def build_filter_condition(condition_dict: dict) -> str: # For array fields, use @> operator (contains) escaped_value = escape_sql_string(str(item)) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" ) else: # For scalar fields, use equality if isinstance(item, str): escaped_value = escape_sql_string(item) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" ) else: condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {item}::agtype" ) else: # Multiple values, use OR conditions @@ -4986,18 +4987,18 @@ def build_filter_condition(condition_dict: dict) -> str: # For array fields, use @> operator (contains) to check if array contains the value escaped_value = escape_sql_string(str(item)) or_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" ) else: # For scalar fields, use equality if isinstance(item, str): escaped_value = escape_sql_string(item) or_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" ) else: or_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = {item}::agtype" ) if or_conditions: condition_parts.append( @@ -5032,11 +5033,11 @@ def build_filter_condition(condition_dict: dict) -> str: .replace("_", "\\_") ) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text LIKE '%{escaped_value}%'" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype)::text LIKE '%{escaped_value}%'" ) else: condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text LIKE '%{op_value}%'" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype)::text LIKE '%{op_value}%'" ) # Check if key starts with "info." prefix (for simple equality) elif key.startswith("info."): @@ -5058,13 +5059,13 @@ def build_filter_condition(condition_dict: dict) -> str: if isinstance(value, str): escaped_value = escape_sql_string(value) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" ) else: # For non-string values (numbers, booleans, etc.), convert to JSON string and then to agtype value_json = json.dumps(value) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"{key}\"'::agtype) = ag_catalog.agtype_in('{value_json}')" ) return " AND ".join(condition_parts) @@ -5229,7 +5230,7 @@ def delete_node_by_prams( id_conditions = [] for node_id in memory_ids: id_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"id\"'::agtype) = '\"{node_id}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"id\"'::agtype) = '\"{node_id}\"'::agtype" ) where_conditions.append(f"({' OR '.join(id_conditions)})") @@ -5331,7 +5332,7 @@ def escape_memory_id(mid: str) -> str: # Escape special characters escaped_mid = escape_memory_id(mid) id_conditions.append( - f"ag_catalog.agtype_access_operator(properties, '\"id\"'::agtype) = '\"{escaped_mid}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties::text::agtype, '\"id\"'::agtype) = '\"{escaped_mid}\"'::agtype" ) where_clause = f"({' OR '.join(id_conditions)})" @@ -5339,8 +5340,8 @@ def escape_memory_id(mid: str) -> str: # Query to get memory_id and user_name pairs query = f""" SELECT - ag_catalog.agtype_access_operator(properties, '\"id\"'::agtype)::text AS memory_id, - ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype)::text AS user_name + ag_catalog.agtype_access_operator(properties::text::agtype, '\"id\"'::agtype)::text AS memory_id, + ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype)::text AS user_name FROM "{self.db_name}_graph"."Memory" WHERE {where_clause} """ @@ -5421,7 +5422,7 @@ def escape_user_name(un: str) -> str: query = f""" SELECT COUNT(*) FROM "{self.db_name}_graph"."Memory" - WHERE ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{escaped_un}\"'::agtype + WHERE ag_catalog.agtype_access_operator(properties::text::agtype, '\"user_name\"'::agtype) = '\"{escaped_un}\"'::agtype """ logger.info(f"[exist_user_name] query: {query}") result_dict = {}