Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,57 @@ def login_example() -> None:
raise RuntimeError("login failed")
```

## Pre-action authority hook (production pattern)

If you want every action proposal to be authorized before execution, pass a
`pre_action_authorizer` into `RuntimeAgent`.

This hook receives a shared `predicate-contracts` `ActionRequest` generated from
runtime state (`snapshot` + assertion evidence) and must return either:

- `True` / `False`, or
- an object with an `allowed: bool` field (for richer decision payloads).

```python
from predicate.agent_runtime import AgentRuntime
from predicate.runtime_agent import RuntimeAgent, RuntimeStep

# Optional: your authority client can be local guard, sidecar client, or remote API client.
def pre_action_authorizer(action_request):
# Example: call your authority service
# resp = authority_client.authorize(action_request)
# return resp
return True


runtime = AgentRuntime(backend=backend, tracer=tracer)
agent = RuntimeAgent(
runtime=runtime,
executor=executor,
pre_action_authorizer=pre_action_authorizer,
authority_principal_id="agent:web-checkout",
authority_tenant_id="tenant-a",
authority_session_id="session-123",
authority_fail_closed=True, # deny/authorizer errors block action execution
)

ok = await agent.run_step(
task_goal="Complete checkout",
step=RuntimeStep(goal="Click submit order"),
)
```

Fail-open option (not recommended for sensitive actions):

```python
agent = RuntimeAgent(
runtime=runtime,
executor=executor,
pre_action_authorizer=pre_action_authorizer,
authority_fail_closed=False, # authorizer errors allow action to proceed
)
```

## Capabilities (lifecycle guarantees)

### Controlled perception
Expand Down
34 changes: 34 additions & 0 deletions predicate/agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from .backends.protocol import BrowserBackend
from .browser import AsyncSentienceBrowser
from .tracing import Tracer
from predicate_contracts import ActionRequest


class AgentRuntime:
Expand Down Expand Up @@ -980,6 +981,39 @@ def _compute_snapshot_digest(self, snap: Snapshot | None) -> str | None:
except Exception:
return None

def build_authority_action_request(
self,
*,
principal_id: str,
action: str,
resource: str,
intent: str,
tenant_id: str | None = None,
session_id: str | None = None,
state_source: str = "sdk-python",
) -> ActionRequest:
"""
Build a predicate-contracts ActionRequest from current runtime state.

This boundary helper keeps sdk-python internals decoupled from authority
enforcement internals by exporting only shared contract types.
"""
from .integrations.authority import (
AuthorityActionInput,
build_action_request_from_runtime,
)

action_input = AuthorityActionInput(
principal_id=principal_id,
action=action,
resource=resource,
intent=intent,
tenant_id=tenant_id,
session_id=session_id,
state_source=state_source,
)
return build_action_request_from_runtime(runtime=self, action_input=action_input)

async def emit_step_end(
self,
*,
Expand Down
13 changes: 13 additions & 0 deletions predicate/integrations/authority/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from predicate.integrations.authority.contracts_adapter import (
AuthorityActionInput,
build_action_request_from_runtime,
state_evidence_from_runtime,
to_verification_evidence,
)

__all__ = [
"AuthorityActionInput",
"build_action_request_from_runtime",
"state_evidence_from_runtime",
"to_verification_evidence",
]
88 changes: 88 additions & 0 deletions predicate/integrations/authority/contracts_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

import hashlib
from dataclasses import dataclass
from typing import Any, Mapping, Sequence

# pylint: disable=import-error

from predicate_contracts import (
ActionRequest,
ActionSpec,
PrincipalRef,
StateEvidence,
VerificationEvidence,
VerificationSignal,
VerificationStatus,
)


@dataclass(frozen=True)
class AuthorityActionInput:
principal_id: str
action: str
resource: str
intent: str
tenant_id: str | None = None
session_id: str | None = None
state_source: str = "sdk-python"


def to_verification_evidence(assertions: Sequence[Mapping[str, Any]]) -> VerificationEvidence:
signals: list[VerificationSignal] = []
for assertion in assertions:
label = str(assertion.get("label", "")).strip()
if label == "":
continue
passed = bool(assertion.get("passed", False))
required = bool(assertion.get("required", False))
reason_raw = assertion.get("reason")
reason = str(reason_raw) if isinstance(reason_raw, str) and reason_raw != "" else None
signals.append(
VerificationSignal(
label=label,
status=VerificationStatus.PASSED if passed else VerificationStatus.FAILED,
required=required,
reason=reason,
)
)
return VerificationEvidence(signals=tuple(signals))


def state_evidence_from_runtime(runtime: Any, source: str = "sdk-python") -> StateEvidence:
snapshot = getattr(runtime, "last_snapshot", None)
step_id = getattr(runtime, "step_id", None)
state_hash = _snapshot_state_hash(snapshot=snapshot, step_id=step_id)
return StateEvidence(source=source, state_hash=state_hash)


def build_action_request_from_runtime(runtime: Any, action_input: AuthorityActionInput) -> ActionRequest:
assertions_payload = runtime.get_assertions_for_step_end()
assertions = assertions_payload.get("assertions", [])
verification_evidence = to_verification_evidence(assertions)
state_evidence = state_evidence_from_runtime(runtime=runtime, source=action_input.state_source)
return ActionRequest(
principal=PrincipalRef(
principal_id=action_input.principal_id,
tenant_id=action_input.tenant_id,
session_id=action_input.session_id,
),
action_spec=ActionSpec(
action=action_input.action,
resource=action_input.resource,
intent=action_input.intent,
),
state_evidence=state_evidence,
verification_evidence=verification_evidence,
)


def _snapshot_state_hash(snapshot: Any, step_id: str | None) -> str:
url = str(getattr(snapshot, "url", "") or "")
timestamp = str(getattr(snapshot, "timestamp", "") or "")
if url != "" or timestamp != "":
digest = hashlib.sha256(f"{url}{timestamp}".encode("utf-8")).hexdigest()
return "sha256:" + digest
fallback_material = step_id or "missing_snapshot"
fallback_digest = hashlib.sha256(fallback_material.encode("utf-8")).hexdigest()
return "sha256:" + fallback_digest
96 changes: 89 additions & 7 deletions predicate/runtime_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class ActOnceResult:
used_vision: bool


@dataclass(frozen=True)
class PreActionAuthorityDecision:
allowed: bool
reason: str | None = None


class RuntimeAgent:
"""
A thin orchestration layer over AgentRuntime:
Expand All @@ -79,12 +85,22 @@ def __init__(
vision_executor: LLMProvider | None = None,
vision_verifier: LLMProvider | None = None,
short_circuit_canvas: bool = True,
pre_action_authorizer: Callable[[Any], Any] | None = None,
authority_principal_id: str | None = None,
authority_tenant_id: str | None = None,
authority_session_id: str | None = None,
authority_fail_closed: bool = True,
) -> None:
self.runtime = runtime
self.executor = executor
self.vision_executor = vision_executor
self.vision_verifier = vision_verifier
self.short_circuit_canvas = short_circuit_canvas
self.pre_action_authorizer = pre_action_authorizer
self.authority_principal_id = authority_principal_id
self.authority_tenant_id = authority_tenant_id
self.authority_session_id = authority_session_id
self.authority_fail_closed = authority_fail_closed

self._structured_llm = LLMInteractionHandler(executor)

Expand Down Expand Up @@ -120,7 +136,7 @@ async def run_step(

# 1) Structured executor attempt.
action = self._propose_structured_action(task_goal=task_goal, step=step, snap=snap)
await self._execute_action(action=action, snap=snap)
await self._execute_action(action=action, snap=snap, step_goal=step.goal)
ok = await self._apply_verifications(step=step)
if ok:
outcome = "ok"
Expand Down Expand Up @@ -268,7 +284,7 @@ async def act_once_result(
temperature=0.0,
)
action = self._extract_action_from_text(resp.content)
await self._execute_action(action=action, snap=snap)
await self._execute_action(action=action, snap=snap, step_goal=step.goal)
return ActOnceResult(action=action, snap=snap, used_vision=True)

# Structured snapshot-first proposal.
Expand All @@ -290,7 +306,7 @@ async def act_once_result(
resp = self._structured_llm.query_llm(dom_context, combined_goal)
action = self._structured_llm.extract_action(resp.content)

await self._execute_action(action=action, snap=snap)
await self._execute_action(action=action, snap=snap, step_goal=step.goal)
return ActOnceResult(action=action, snap=snap, used_vision=False)

async def _run_hook(
Expand Down Expand Up @@ -367,7 +383,7 @@ async def _vision_executor_attempt(
)

action = self._extract_action_from_text(resp.content)
await self._execute_action(action=action, snap=snap)
await self._execute_action(action=action, snap=snap, step_goal=step.goal)
# Important: vision executor fallback is a *retry* of the same step.
# Clear prior step assertions so required_assertions_passed reflects the final attempt.
self.runtime.flush_assertions()
Expand Down Expand Up @@ -397,21 +413,28 @@ async def _apply_verifications(self, *, step: RuntimeStep) -> bool:
# Respect required verifications semantics.
return self.runtime.required_assertions_passed() and all_ok

async def _execute_action(self, *, action: str, snap: Snapshot | None) -> None:
async def _execute_action(self, *, action: str, snap: Snapshot | None, step_goal: str | None) -> None:
url = None
try:
url = await self.runtime.get_url()
except Exception:
url = getattr(snap, "url", None)

await self.runtime.record_action(action, url=url)

# Coordinate-backed execution (by snapshot id or explicit coordinates).
kind, payload = self._parse_action(action)

if kind == "finish":
await self.runtime.record_action(action, url=url)
return

await self._authorize_pre_action_or_raise(
action=action,
kind=kind,
url=url,
step_goal=step_goal,
)
await self.runtime.record_action(action, url=url)

if kind == "press":
await self._press_key_best_effort(payload["key"])
await self._stabilize_best_effort()
Expand Down Expand Up @@ -449,6 +472,65 @@ async def _execute_action(self, *, action: str, snap: Snapshot | None) -> None:

raise ValueError(f"Unknown action kind: {kind}")

async def _authorize_pre_action_or_raise(
self,
*,
action: str,
kind: str,
url: str | None,
step_goal: str | None,
) -> None:
if self.pre_action_authorizer is None:
return
principal_id = self.authority_principal_id or "agent:sdk-python"
action_name = self._authority_action_name(kind)
resource = url or "about:blank"
intent = step_goal or action

try:
request = self.runtime.build_authority_action_request(
principal_id=principal_id,
action=action_name,
resource=resource,
intent=intent,
tenant_id=self.authority_tenant_id,
session_id=self.authority_session_id,
)
decision_raw = self.pre_action_authorizer(request)
if inspect.isawaitable(decision_raw):
decision_raw = await decision_raw
decision = self._normalize_authority_decision(decision_raw)
if decision.allowed:
return
raise RuntimeError(
f"pre_action_authority_denied: {decision.reason or 'denied_by_authority'}"
)
except Exception:
if self.authority_fail_closed:
raise
return

def _normalize_authority_decision(self, value: Any) -> PreActionAuthorityDecision:
if isinstance(value, PreActionAuthorityDecision):
return value
allowed_attr = getattr(value, "allowed", None)
if isinstance(allowed_attr, bool):
reason_attr = getattr(value, "reason", None)
reason = str(reason_attr) if isinstance(reason_attr, str) and reason_attr else None
return PreActionAuthorityDecision(allowed=allowed_attr, reason=reason)
if isinstance(value, bool):
return PreActionAuthorityDecision(allowed=value)
raise RuntimeError("invalid_pre_action_authority_decision")

def _authority_action_name(self, kind: str) -> str:
if kind in {"click", "click_xy", "click_rect"}:
return "browser.click"
if kind == "type":
return "browser.type"
if kind == "press":
return "browser.press"
return "browser.unknown"

async def _stabilize_best_effort(self) -> None:
try:
await self.runtime.backend.wait_ready_state(state="interactive", timeout_ms=15000)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"httpx>=0.25.0", # For async API calls
"playwright-stealth>=1.0.6", # Bot evasion and stealth mode
"markdownify>=0.11.6", # Enhanced HTML to Markdown conversion
"predicate-contracts",
]

[project.urls]
Expand Down
Loading
Loading