From 1dd39257c369ebbd53bd1f82a594c23355e359a2 Mon Sep 17 00:00:00 2001 From: SentienceDEV Date: Fri, 20 Feb 2026 19:01:45 -0800 Subject: [PATCH] non-web state_hash canonicalization --- predicate_contracts/__init__.py | 54 +- predicate_contracts/canonicalization.py | 633 ++++++++++++++++++++++++ tests/test_canonicalization.py | 321 ++++++++++++ 3 files changed, 1005 insertions(+), 3 deletions(-) create mode 100644 predicate_contracts/canonicalization.py create mode 100644 tests/test_canonicalization.py diff --git a/predicate_contracts/__init__.py b/predicate_contracts/__init__.py index d453dbf..8e1dbab 100644 --- a/predicate_contracts/__init__.py +++ b/predicate_contracts/__init__.py @@ -1,3 +1,25 @@ +from predicate_contracts.canonicalization import ( # Types; Utility functions; Terminal canonicalization; Desktop canonicalization + DESKTOP_SCHEMA_VERSION, + TERMINAL_SCHEMA_VERSION, + CanonicalAccessibilityNode, + CanonicalDesktopSnapshot, + CanonicalTerminalSnapshot, + build_focused_path, + canonicalize_accessibility_node, + canonicalize_desktop_snapshot, + canonicalize_terminal_snapshot, + compute_desktop_state_hash, + compute_terminal_state_hash, + hash_environment, + is_secret_key, + normalize_command, + normalize_path, + normalize_text, + normalize_timestamps, + normalize_transcript, + sha256, + strip_ansi, +) from predicate_contracts.models import ( ActionRequest, ActionSpec, @@ -21,6 +43,7 @@ ) __all__ = [ + # Models "ActionRequest", "ActionSpec", "AuthorizationDecision", @@ -32,10 +55,35 @@ "ProofEvent", "SignedMandate", "StateEvidence", - "StateEvidenceProvider", - "TraceEmitter", "VerificationEvidence", - "VerificationEvidenceProvider", "VerificationSignal", "VerificationStatus", + # Protocols + "StateEvidenceProvider", + "TraceEmitter", + "VerificationEvidenceProvider", + # Canonicalization types + "CanonicalTerminalSnapshot", + "CanonicalAccessibilityNode", + "CanonicalDesktopSnapshot", + # Canonicalization utilities + "normalize_text", + "normalize_command", + "strip_ansi", + "normalize_timestamps", + "normalize_transcript", + "normalize_path", + "is_secret_key", + "hash_environment", + "sha256", + # Terminal canonicalization + "canonicalize_terminal_snapshot", + "compute_terminal_state_hash", + "TERMINAL_SCHEMA_VERSION", + # Desktop canonicalization + "canonicalize_accessibility_node", + "build_focused_path", + "canonicalize_desktop_snapshot", + "compute_desktop_state_hash", + "DESKTOP_SCHEMA_VERSION", ] diff --git a/predicate_contracts/canonicalization.py b/predicate_contracts/canonicalization.py new file mode 100644 index 0000000..84cbb11 --- /dev/null +++ b/predicate_contracts/canonicalization.py @@ -0,0 +1,633 @@ +""" +Canonicalization utilities for non-web state evidence. + +This module provides consistent normalization for terminal and desktop +accessibility snapshots, ensuring reproducible state hashes across +different runs, platforms, and environments. + +Example: + >>> from predicate_contracts.canonicalization import ( + ... canonicalize_terminal_snapshot, + ... compute_terminal_state_hash, + ... ) + >>> snapshot = { + ... "session_id": "sess-123", + ... "cwd": "~/projects/myapp", + ... "command": "npm test", # Extra whitespace normalized + ... "transcript": "\\x1b[32mPASS\\x1b[0m all tests", # ANSI stripped + ... } + >>> state_hash = compute_terminal_state_hash(snapshot) +""" + +from __future__ import annotations + +import hashlib +import json +import os +import re +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal + +# ============================================================================= +# Types +# ============================================================================= + +Platform = Literal["darwin", "linux", "win32"] + +# ============================================================================= +# Text Normalization +# ============================================================================= + + +def normalize_text(text: str | None, max_len: int = 80) -> str: + """ + Normalize text for canonical comparison. + + Transforms: + - Trims leading/trailing whitespace + - Collapses internal whitespace to single spaces + - Lowercases + - Caps length + + Args: + text: Input text (may be None) + max_len: Maximum length to retain (default: 80) + + Returns: + Normalized text string (empty string if input is None) + + Examples: + >>> normalize_text(" Hello World ") + 'hello world' + >>> normalize_text(None) + '' + """ + if not text: + return "" + + # Trim and collapse whitespace + normalized = " ".join(text.split()) + # Lowercase + normalized = normalized.lower() + # Cap length + if len(normalized) > max_len: + normalized = normalized[:max_len] + return normalized + + +def normalize_command(cmd: str | None) -> str: + """ + Normalize a command string. + + Unlike normalize_text, this preserves case (commands are case-sensitive) + but still trims and collapses whitespace. + + Args: + cmd: Command string + + Returns: + Normalized command + """ + if not cmd: + return "" + + # Trim and collapse whitespace only (preserve case) + return " ".join(cmd.split()) + + +# ============================================================================= +# ANSI Escape Code Handling +# ============================================================================= + +# ANSI escape sequence pattern +# Matches color codes, cursor movement, and terminal control sequences +ANSI_PATTERN = re.compile(r"\x1b\[[0-9;]*[a-zA-Z]") + + +def strip_ansi(text: str) -> str: + """ + Remove all ANSI escape sequences from text. + + Handles: + - Color codes: \\x1b[31m (red), \\x1b[0m (reset) + - Cursor movement: \\x1b[2J (clear screen) + - Terminal control sequences + + Args: + text: Text potentially containing ANSI codes + + Returns: + Text with ANSI codes removed + """ + return ANSI_PATTERN.sub("", text) + + +# ============================================================================= +# Timestamp Normalization +# ============================================================================= + +# Common timestamp patterns to normalize +TIMESTAMP_PATTERNS = [ + re.compile(r"\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?"), # ISO 8601 + re.compile(r"\d{2}:\d{2}:\d{2}"), # HH:MM:SS + re.compile(r"\[\d+\.\d+s\]"), # Duration [1.23s] +] + + +def normalize_timestamps(text: str) -> str: + """ + Replace common timestamp patterns with placeholder. + + This ensures that transcript hashes remain stable across runs + even when timestamps differ. + + Args: + text: Text potentially containing timestamps + + Returns: + Text with timestamps replaced by [TIMESTAMP] + """ + result = text + for pattern in TIMESTAMP_PATTERNS: + result = pattern.sub("[TIMESTAMP]", result) + return result + + +# ============================================================================= +# Transcript Normalization +# ============================================================================= + +# Maximum transcript length in bytes (10KB) +MAX_TRANSCRIPT_LENGTH = 10 * 1024 + + +def normalize_transcript(transcript: str | None) -> str: + """ + Normalize a terminal transcript for canonical hashing. + + Steps: + 1. Strip ANSI escape codes + 2. Normalize timestamps + 3. For each line: trim trailing whitespace, collapse internal whitespace + 4. Remove empty trailing lines + 5. Cap total length + + Args: + transcript: Raw terminal transcript + + Returns: + Normalized transcript + """ + if not transcript: + return "" + + # Strip ANSI codes first + normalized = strip_ansi(transcript) + + # Normalize timestamps + normalized = normalize_timestamps(normalized) + + # Process line by line + lines = [] + for line in normalized.split("\n"): + # Trim trailing whitespace + processed = line.rstrip() + # Collapse internal whitespace (tabs -> space, multiple spaces -> single) + processed = re.sub(r"\t", " ", processed) + processed = re.sub(r" +", " ", processed) + lines.append(processed) + + # Remove empty trailing lines + while lines and lines[-1] == "": + lines.pop() + + # Join and cap length + result = "\n".join(lines) + if len(result) > MAX_TRANSCRIPT_LENGTH: + result = result[:MAX_TRANSCRIPT_LENGTH] + + return result + + +# ============================================================================= +# Path Normalization +# ============================================================================= + + +def normalize_path(input_path: str | None) -> str: + """ + Normalize a file system path for canonical hashing. + + Handles: + - Home directory expansion (~ on Unix, %USERPROFILE% on Windows) + - Resolution of . and .. + - Conversion to absolute path + - Lowercase drive letter on Windows + + Note: Symlink resolution is not performed (would require filesystem access). + + Args: + input_path: Path to normalize + + Returns: + Normalized absolute path in OS-native format + """ + if not input_path: + return "" + + normalized = input_path + + # Expand home directory (cross-platform) + if normalized.startswith("~"): + # Unix/macOS: ~/foo -> /Users/name/foo + home = os.environ.get("HOME", "") + normalized = normalized.replace("~", home, 1) + elif "%USERPROFILE%" in normalized: + # Windows: %USERPROFILE%\foo -> C:\Users\name\foo + user_profile = os.environ.get("USERPROFILE", "") + normalized = re.sub(r"%USERPROFILE%", user_profile, normalized, flags=re.IGNORECASE) + + # Resolve . and .. (uses OS-native separators) + path_obj = Path(normalized) + try: + # Convert to absolute if relative + if not path_obj.is_absolute(): + path_obj = Path.cwd() / path_obj + # Resolve . and .. + normalized = str(path_obj.resolve()) + except (OSError, ValueError): + # If resolution fails, just normalize the path syntax + normalized = os.path.normpath(normalized) + + # Windows: lowercase drive letter for consistency (C: -> c:) + if sys.platform == "win32" and len(normalized) >= 2 and normalized[1] == ":": + normalized = normalized[0].lower() + normalized[1:] + + return normalized + + +# ============================================================================= +# Environment Variable Hashing +# ============================================================================= + +# Patterns that indicate an environment variable contains a secret +SECRET_PATTERNS = [ + re.compile(r"^(AWS_|AZURE_|GCP_|GOOGLE_)", re.IGNORECASE), # Cloud providers + re.compile(r"(_KEY|_SECRET|_TOKEN|_PASSWORD)$", re.IGNORECASE), # Common suffixes + re.compile(r"^(API_KEY|AUTH_TOKEN|PRIVATE_KEY)$", re.IGNORECASE), # Common names + re.compile(r"^(DATABASE_URL|REDIS_URL)$", re.IGNORECASE), # Connection strings +] + + +def is_secret_key(key: str) -> bool: + """ + Check if an environment variable key indicates a secret value. + + Args: + key: Environment variable name + + Returns: + True if the key matches a secret pattern + """ + return any(p.search(key) for p in SECRET_PATTERNS) + + +def hash_environment(env: dict[str, str] | None) -> str: + """ + Hash environment variables for canonical representation. + + - Redacts values for keys matching secret patterns + - Sorts keys for determinism + - Returns SHA-256 hash of canonical representation + + Args: + env: Environment variables + + Returns: + SHA-256 hash of canonical env representation + """ + if not env: + return sha256("") + + # Filter out secrets + safe_env: dict[str, str] = {} + for key, value in env.items(): + if is_secret_key(key): + safe_env[key] = "[REDACTED]" + else: + safe_env[key] = value + + # Sort keys for determinism + sorted_keys = sorted(safe_env.keys()) + canonical = "\n".join(f"{k}={safe_env[k]}" for k in sorted_keys) + + return sha256(canonical) + + +# ============================================================================= +# Hashing +# ============================================================================= + + +def sha256(input_str: str) -> str: + """ + Compute SHA-256 hash of input string. + + Args: + input_str: String to hash + + Returns: + Hex-encoded SHA-256 hash + """ + return hashlib.sha256(input_str.encode("utf-8")).hexdigest() + + +# ============================================================================= +# Terminal Session Canonicalization +# ============================================================================= + +# Schema version for terminal canonicalization +TERMINAL_SCHEMA_VERSION = "terminal:v1.0" + + +@dataclass(frozen=True) +class CanonicalTerminalSnapshot: + """Canonical terminal snapshot with normalized fields.""" + + session_id: str + terminal_id: str + cwd_normalized: str + command_normalized: str + transcript_normalized: str + exit_code: int | None + env_hash: str + platform: Platform + + +def detect_platform() -> Platform: + """Detect the current platform.""" + platform = sys.platform + if platform in ("darwin", "linux", "win32"): + return platform # type: ignore[return-value] + # Default to linux for unknown Unix-like platforms + return "linux" + + +def canonicalize_terminal_snapshot(snapshot: dict[str, Any]) -> CanonicalTerminalSnapshot: + """ + Canonicalize a terminal session snapshot. + + Normalizes all fields to produce a deterministic representation: + - cwd: Resolved to absolute path + - command: Trimmed and whitespace-collapsed (case preserved) + - transcript: ANSI stripped, timestamps normalized, whitespace collapsed + - env: Sorted, secrets redacted, then hashed + + Args: + snapshot: Raw terminal session snapshot dict + + Returns: + Canonical snapshot for hashing + """ + platform = snapshot.get("platform") or detect_platform() + + return CanonicalTerminalSnapshot( + session_id=snapshot.get("session_id", ""), + terminal_id=snapshot.get("terminal_id", "") or "", + cwd_normalized=normalize_path(snapshot.get("cwd")), + command_normalized=normalize_command(snapshot.get("command")), + transcript_normalized=normalize_transcript(snapshot.get("transcript")), + exit_code=snapshot.get("exit_code"), + env_hash=hash_environment(snapshot.get("env")), + platform=platform, + ) + + +def compute_terminal_state_hash(snapshot: dict[str, Any] | CanonicalTerminalSnapshot) -> str: + """ + Compute state hash for a terminal session snapshot. + + The hash includes all canonical fields in a deterministic order. + Platform is included because different platforms have different + security contexts (e.g., Unix vs Windows permissions). + + Args: + snapshot: Raw or canonical terminal snapshot + + Returns: + SHA-256 hash prefixed with "sha256:" + """ + # Canonicalize if not already canonical + if isinstance(snapshot, CanonicalTerminalSnapshot): + canonical = snapshot + else: + canonical = canonicalize_terminal_snapshot(snapshot) + + # Build deterministic JSON (sorted keys) + hash_input = json.dumps( + { + "command_normalized": canonical.command_normalized, + "cwd_normalized": canonical.cwd_normalized, + "env_hash": canonical.env_hash, + "exit_code": canonical.exit_code, + "platform": canonical.platform, + "session_id": canonical.session_id, + "terminal_id": canonical.terminal_id, + "transcript_normalized": canonical.transcript_normalized, + }, + sort_keys=True, + ) + + return f"sha256:{sha256(hash_input)}" + + +# ============================================================================= +# Desktop Accessibility Canonicalization +# ============================================================================= + +# Schema version for desktop canonicalization +DESKTOP_SCHEMA_VERSION = "desktop:v1.0" + +# Maximum depth for UI tree canonicalization +MAX_TREE_DEPTH = 10 + +# Maximum children per node +MAX_CHILDREN_PER_NODE = 50 + +# Maximum length for window title +MAX_WINDOW_TITLE_LENGTH = 100 + + +@dataclass(frozen=True) +class CanonicalAccessibilityNode: + """Canonical accessibility node with normalized fields.""" + + role: str + name_norm: str + children: tuple[CanonicalAccessibilityNode, ...] + + +@dataclass(frozen=True) +class CanonicalDesktopSnapshot: + """Canonical desktop snapshot with normalized fields.""" + + app_name_norm: str + window_title_norm: str + focused_path: str + tree_hash: str + platform: Platform + + +def canonicalize_accessibility_node( + node: dict[str, Any] | None, + depth: int = 0, +) -> CanonicalAccessibilityNode: + """ + Canonicalize an accessibility tree node. + + Normalizes: + - role: Lowercase, trimmed + - name: Text normalization (whitespace, case, length) + - children: Recursively canonicalized, sorted by (role, name) + + Ignores transient attributes: pid, position, focused, selected. + + Args: + node: Raw accessibility node + depth: Current depth (for truncation) + + Returns: + Canonical node + """ + if not node: + return CanonicalAccessibilityNode(role="", name_norm="", children=()) + + role = (node.get("role") or "").lower().strip() + name_norm = normalize_text(node.get("name")) + + # Truncate at max depth + if depth >= MAX_TREE_DEPTH: + return CanonicalAccessibilityNode(role=role, name_norm=name_norm, children=()) + + # Canonicalize children + children: list[CanonicalAccessibilityNode] = [] + raw_children = node.get("children") + if raw_children and isinstance(raw_children, list): + # Limit children count + limited_children = raw_children[:MAX_CHILDREN_PER_NODE] + + # Canonicalize each child + children = [canonicalize_accessibility_node(child, depth + 1) for child in limited_children] + + # Sort children by (role, name_norm) for determinism + children.sort(key=lambda c: (c.role, c.name_norm)) + + return CanonicalAccessibilityNode(role=role, name_norm=name_norm, children=tuple(children)) + + +def build_focused_path(focused_role: str | None = None, focused_name: str | None = None) -> str: + """ + Build a focused element path string. + + Creates a path like "button[save]" representing the focused element. + + Args: + focused_role: Role of the focused element + focused_name: Name of the focused element + + Returns: + Path string + """ + role = (focused_role or "").lower().strip() + name = normalize_text(focused_name) + + if not role and not name: + return "" + + if not name: + return role + + return f"{role}[{name}]" + + +def _canonical_node_to_dict(node: CanonicalAccessibilityNode) -> dict[str, Any]: + """Convert canonical node to dict for JSON serialization.""" + return { + "role": node.role, + "name_norm": node.name_norm, + "children": [_canonical_node_to_dict(c) for c in node.children], + } + + +def canonicalize_desktop_snapshot(snapshot: dict[str, Any]) -> CanonicalDesktopSnapshot: + """ + Canonicalize a desktop accessibility snapshot. + + Normalizes all fields to produce a deterministic representation: + - app_name: Lowercase, trimmed + - window_title: Text normalization (capped at 100 chars) + - focused_path: Built from focused element info + - tree_hash: SHA-256 of canonical tree JSON + + Args: + snapshot: Raw desktop accessibility snapshot + + Returns: + Canonical snapshot for hashing + """ + platform = snapshot.get("platform") or detect_platform() + + # Canonicalize the UI tree if present + if snapshot.get("ui_tree"): + canonical_tree = canonicalize_accessibility_node(snapshot["ui_tree"]) + tree_hash = sha256(json.dumps(_canonical_node_to_dict(canonical_tree), sort_keys=True)) + elif snapshot.get("ui_tree_text"): + # Fallback: hash the raw text if no structured tree + tree_hash = sha256(normalize_text(snapshot["ui_tree_text"], 10000)) + else: + tree_hash = sha256("") + + return CanonicalDesktopSnapshot( + app_name_norm=normalize_text(snapshot.get("app_name")), + window_title_norm=normalize_text(snapshot.get("window_title"), MAX_WINDOW_TITLE_LENGTH), + focused_path=build_focused_path(snapshot.get("focused_role"), snapshot.get("focused_name")), + tree_hash=tree_hash, + platform=platform, + ) + + +def compute_desktop_state_hash(snapshot: dict[str, Any] | CanonicalDesktopSnapshot) -> str: + """ + Compute state hash for a desktop accessibility snapshot. + + The hash includes all canonical fields in a deterministic order. + Platform is included because different platforms have different + accessibility APIs and security contexts. + + Args: + snapshot: Raw or canonical desktop snapshot + + Returns: + SHA-256 hash prefixed with "sha256:" + """ + # Canonicalize if not already canonical + if isinstance(snapshot, CanonicalDesktopSnapshot): + canonical = snapshot + else: + canonical = canonicalize_desktop_snapshot(snapshot) + + # Build deterministic JSON (sorted keys) + hash_input = json.dumps( + { + "app_name_norm": canonical.app_name_norm, + "focused_path": canonical.focused_path, + "platform": canonical.platform, + "tree_hash": canonical.tree_hash, + "window_title_norm": canonical.window_title_norm, + }, + sort_keys=True, + ) + + return f"sha256:{sha256(hash_input)}" diff --git a/tests/test_canonicalization.py b/tests/test_canonicalization.py new file mode 100644 index 0000000..da07e19 --- /dev/null +++ b/tests/test_canonicalization.py @@ -0,0 +1,321 @@ +"""Tests for predicate_contracts.canonicalization module.""" + +from __future__ import annotations + +from predicate_contracts.canonicalization import ( # Utility functions; Terminal canonicalization; Desktop canonicalization + DESKTOP_SCHEMA_VERSION, + TERMINAL_SCHEMA_VERSION, + build_focused_path, + canonicalize_accessibility_node, + canonicalize_desktop_snapshot, + canonicalize_terminal_snapshot, + compute_desktop_state_hash, + compute_terminal_state_hash, + hash_environment, + is_secret_key, + normalize_command, + normalize_path, + normalize_text, + normalize_timestamps, + normalize_transcript, + sha256, + strip_ansi, +) + + +class TestNormalizeText: + """Tests for normalize_text function.""" + + def test_trims_and_collapses_whitespace(self) -> None: + assert normalize_text(" Hello World ") == "hello world" + + def test_lowercases_text(self) -> None: + assert normalize_text("HELLO") == "hello" + + def test_caps_length_at_max_len(self) -> None: + long_text = "a" * 100 + assert len(normalize_text(long_text, 80)) == 80 + + def test_returns_empty_for_none(self) -> None: + assert normalize_text(None) == "" + + def test_returns_empty_for_empty_string(self) -> None: + assert normalize_text("") == "" + + +class TestNormalizeCommand: + """Tests for normalize_command function.""" + + def test_trims_and_collapses_whitespace_preserves_case(self) -> None: + assert normalize_command(" ls -la ") == "ls -la" + assert normalize_command(" Git Status ") == "Git Status" + + def test_returns_empty_for_none(self) -> None: + assert normalize_command(None) == "" + + +class TestStripAnsi: + """Tests for strip_ansi function.""" + + def test_removes_color_codes(self) -> None: + assert strip_ansi("\x1b[31mRed\x1b[0m") == "Red" + assert strip_ansi("\x1b[32mGreen\x1b[0m") == "Green" + + def test_removes_cursor_movement_codes(self) -> None: + assert strip_ansi("\x1b[2JClear") == "Clear" + + def test_leaves_plain_text_unchanged(self) -> None: + assert strip_ansi("Hello World") == "Hello World" + + +class TestNormalizeTimestamps: + """Tests for normalize_timestamps function.""" + + def test_replaces_iso8601_timestamps(self) -> None: + assert normalize_timestamps("2024-01-15T10:30:45.123Z") == "[TIMESTAMP]" + assert normalize_timestamps("2024-01-15 10:30:45") == "[TIMESTAMP]" + + def test_replaces_time_only_timestamps(self) -> None: + assert normalize_timestamps("Started at 10:30:45") == "Started at [TIMESTAMP]" + + def test_replaces_duration_markers(self) -> None: + assert normalize_timestamps("Completed [1.23s]") == "Completed [TIMESTAMP]" + + +class TestNormalizeTranscript: + """Tests for normalize_transcript function.""" + + def test_strips_ansi_and_normalizes_whitespace(self) -> None: + raw = "\x1b[32mPASS\x1b[0m test suite" + assert normalize_transcript(raw) == "PASS test suite" + + def test_normalizes_timestamps(self) -> None: + raw = "Completed at 10:30:45" + assert normalize_transcript(raw) == "Completed at [TIMESTAMP]" + + def test_removes_empty_trailing_lines(self) -> None: + raw = "Line 1\nLine 2\n\n\n" + assert normalize_transcript(raw) == "Line 1\nLine 2" + + def test_returns_empty_for_none(self) -> None: + assert normalize_transcript(None) == "" + + def test_caps_length_at_10kb(self) -> None: + huge = "x" * 20 * 1024 + assert len(normalize_transcript(huge)) <= 10 * 1024 + + +class TestNormalizePath: + """Tests for normalize_path function.""" + + def test_resolves_dot_components(self) -> None: + result = normalize_path("/foo/./bar/../baz") + assert "/." not in result + assert "/.." not in result + + def test_returns_empty_for_none(self) -> None: + assert normalize_path(None) == "" + + +class TestIsSecretKey: + """Tests for is_secret_key function.""" + + def test_detects_cloud_provider_prefixes(self) -> None: + assert is_secret_key("AWS_ACCESS_KEY_ID") is True + assert is_secret_key("AZURE_CLIENT_SECRET") is True + assert is_secret_key("GCP_SERVICE_ACCOUNT") is True + assert is_secret_key("GOOGLE_APPLICATION_CREDENTIALS") is True + + def test_detects_common_secret_suffixes(self) -> None: + assert is_secret_key("DATABASE_PASSWORD") is True + assert is_secret_key("MY_SECRET") is True + assert is_secret_key("AUTH_TOKEN") is True + assert is_secret_key("PRIVATE_KEY") is True + + def test_allows_non_secret_keys(self) -> None: + assert is_secret_key("HOME") is False + assert is_secret_key("PATH") is False + assert is_secret_key("NODE_ENV") is False + + +class TestHashEnvironment: + """Tests for hash_environment function.""" + + def test_returns_consistent_hash_for_same_env(self) -> None: + env = {"HOME": "/home/user", "PATH": "/usr/bin"} + hash1 = hash_environment(env) + hash2 = hash_environment(env) + assert hash1 == hash2 + + def test_sorts_keys_for_determinism(self) -> None: + env1 = {"B": "2", "A": "1"} + env2 = {"A": "1", "B": "2"} + assert hash_environment(env1) == hash_environment(env2) + + def test_redacts_secret_values(self) -> None: + with_secret = {"AWS_ACCESS_KEY_ID": "secret123", "HOME": "/home"} + with_redacted = {"AWS_ACCESS_KEY_ID": "[REDACTED]", "HOME": "/home"} + assert hash_environment(with_secret) == hash_environment(with_redacted) + + def test_returns_hash_for_none(self) -> None: + assert hash_environment(None) == sha256("") + + +class TestSha256: + """Tests for sha256 function.""" + + def test_produces_64_char_hex_hash(self) -> None: + result = sha256("hello") + assert len(result) == 64 + assert all(c in "0123456789abcdef" for c in result) + + def test_different_inputs_produce_different_hashes(self) -> None: + assert sha256("hello") != sha256("world") + + +class TestTerminalCanonicalization: + """Tests for terminal canonicalization functions.""" + + def test_canonicalize_terminal_snapshot_normalizes_all_fields(self) -> None: + raw = { + "session_id": "sess-1", + "cwd": "/tmp/./foo/../bar", + "command": " npm test ", + "transcript": "\x1b[32mOK\x1b[0m All tests passed at 10:30:45", + } + + canonical = canonicalize_terminal_snapshot(raw) + + assert canonical.session_id == "sess-1" + assert canonical.command_normalized == "npm test" + assert "OK" in canonical.transcript_normalized + assert "[TIMESTAMP]" in canonical.transcript_normalized + assert "\x1b" not in canonical.transcript_normalized + + def test_compute_terminal_state_hash_produces_prefixed_hash(self) -> None: + result = compute_terminal_state_hash( + { + "session_id": "sess-1", + "command": "npm test", + } + ) + assert result.startswith("sha256:") + assert len(result) == 7 + 64 # "sha256:" + 64 hex chars + + def test_equivalent_inputs_produce_identical_hashes(self) -> None: + snap1 = { + "session_id": "sess-1", + "command": " npm test ", + "transcript": "\x1b[32mOK\x1b[0m", + } + snap2 = { + "session_id": "sess-1", + "command": "npm test", + "transcript": "OK", + } + assert compute_terminal_state_hash(snap1) == compute_terminal_state_hash(snap2) + + def test_different_inputs_produce_different_hashes(self) -> None: + snap1 = {"session_id": "sess-1", "command": "npm test"} + snap2 = {"session_id": "sess-1", "command": "npm build"} + assert compute_terminal_state_hash(snap1) != compute_terminal_state_hash(snap2) + + def test_exports_terminal_schema_version(self) -> None: + assert TERMINAL_SCHEMA_VERSION == "terminal:v1.0" + + +class TestDesktopCanonicalization: + """Tests for desktop canonicalization functions.""" + + def test_canonicalize_accessibility_node_normalizes_role_and_name(self) -> None: + node = canonicalize_accessibility_node( + { + "role": "BUTTON", + "name": " Click Me ", + "children": [], + } + ) + assert node.role == "button" + assert node.name_norm == "click me" + + def test_canonicalize_accessibility_node_sorts_children(self) -> None: + node = canonicalize_accessibility_node( + { + "role": "group", + "children": [ + {"role": "button", "name": "B"}, + {"role": "button", "name": "A"}, + {"role": "link", "name": "C"}, + ], + } + ) + assert node.children[0].name_norm == "a" + assert node.children[1].name_norm == "b" + assert node.children[2].name_norm == "c" + + def test_canonicalize_accessibility_node_truncates_at_max_depth(self) -> None: + # Build a deeply nested tree + deep_node: dict = {"role": "root", "children": []} + current = deep_node + for i in range(15): + child: dict = {"role": f"level-{i}", "children": []} + current["children"] = [child] + current = child + current["children"] = [{"role": "leaf", "children": []}] + + canonical = canonicalize_accessibility_node(deep_node) + + # Find the deepest non-empty level + depth = 0 + node = canonical + while node.children: + depth += 1 + node = node.children[0] + assert depth <= 10 + + def test_build_focused_path_with_role_and_name(self) -> None: + assert build_focused_path("button", "Save") == "button[save]" + + def test_build_focused_path_with_role_only(self) -> None: + assert build_focused_path("button") == "button" + + def test_build_focused_path_with_nothing(self) -> None: + assert build_focused_path() == "" + + def test_canonicalize_desktop_snapshot_normalizes_all_fields(self) -> None: + raw = { + "app_name": " Firefox ", + "window_title": " GitHub - Home ", + "focused_role": "BUTTON", + "focused_name": " Sign In ", + } + + canonical = canonicalize_desktop_snapshot(raw) + + assert canonical.app_name_norm == "firefox" + assert canonical.window_title_norm == "github - home" + assert canonical.focused_path == "button[sign in]" + + def test_compute_desktop_state_hash_produces_prefixed_hash(self) -> None: + result = compute_desktop_state_hash( + { + "app_name": "Firefox", + "window_title": "GitHub", + } + ) + assert result.startswith("sha256:") + assert len(result) == 7 + 64 + + def test_equivalent_inputs_produce_identical_hashes(self) -> None: + snap1 = { + "app_name": " Firefox ", + "window_title": " GitHub ", + } + snap2 = { + "app_name": "Firefox", + "window_title": "GitHub", + } + assert compute_desktop_state_hash(snap1) == compute_desktop_state_hash(snap2) + + def test_exports_desktop_schema_version(self) -> None: + assert DESKTOP_SCHEMA_VERSION == "desktop:v1.0"