diff --git a/docs/guides/mcp.md b/docs/guides/mcp.md index 756f226..0c35137 100644 --- a/docs/guides/mcp.md +++ b/docs/guides/mcp.md @@ -1,521 +1,283 @@ -# MCP Security Integration +# MCP Service Client (Low-Level gRPC) -**Model Context Protocol (MCP) integration for validating tool access and server identity.** +**Low-level gRPC client for MCP security operations via capiscio-core.** -The CapiscIO Python SDK provides security middleware for MCP tools, implementing: +The CapiscIO Python SDK provides access to MCP security operations (RFC-006 Tool Authority +and RFC-007 Server Identity) through the `MCPClient` gRPC wrapper. -- **RFC-006**: Tool access evaluation based on trust levels -- **RFC-007**: Server identity verification via trust badges +!!! note "Looking for high-level MCP integration?" + This guide documents the **low-level gRPC API** for direct capiscio-core access. + + For a high-level MCP integration library with decorators like `@guard`, see the + **[capiscio-mcp-python](https://github.com/capiscio/capiscio-mcp-python)** package: + + ```bash + pip install capiscio-mcp + ``` -## Installation +## Overview -The MCP module requires the `mcp` extra: +The SDK's `MCPClient` provides direct access to capiscio-core's MCPService gRPC methods: -```bash -pip install capiscio-sdk[mcp] -``` +- **`evaluate_tool_access()`** - RFC-006 §6.2-6.4: Evaluate tool access and emit evidence +- **`verify_server_identity()`** - RFC-007 §7.2: Verify server identity from DID + badge +- **`parse_server_identity_http()`** - RFC-007 §5.2: Extract identity from HTTP headers +- **`parse_server_identity_jsonrpc()`** - RFC-007 §5.3: Extract identity from JSON-RPC _meta +- **`health()`** - Service health and version check ## Quick Start -### Evaluate Tool Access (RFC-006) - -Before allowing a tool to execute, evaluate whether the calling server has sufficient trust: - ```python -from capiscio_sdk.mcp import evaluate_tool_access, TrustLevel, DenyReason - -# Evaluate whether a server can access a tool -result = evaluate_tool_access( - tool_name="file_read", - server_endpoint="https://agent.example.com", - trust_level=TrustLevel.VERIFIED -) - -if result.allow: - # Proceed with tool execution - print(f"Access granted with trust level: {result.trust_level}") -else: - # Handle denial - print(f"Access denied: {result.deny_reason}") -``` - -### Verify Server Identity (RFC-007) - -Verify that an MCP server has a valid trust badge: - -```python -from capiscio_sdk.mcp import verify_server_identity, ServerState - -result = verify_server_identity( - server_endpoint="https://mcp-server.example.com", - expected_did="did:web:example.com" # Optional -) +from capiscio_sdk._rpc.client import CapiscioRPCClient + +# Connect to capiscio-core gRPC server +client = CapiscioRPCClient(address="localhost:50051") +client.connect() + +try: + # Check service health + health = client.mcp.health() + print(f"MCP service: {health['core_version']}") + + # Evaluate tool access (RFC-006) + result = client.mcp.evaluate_tool_access( + tool_name="read_file", + params_hash="sha256:abc123", + server_origin="https://example.com", + badge_jws=badge_token, # Caller's badge + min_trust_level=1, + ) + + if result["decision"] == "allow": + print(f"Access granted for {result['agent_did']}") + else: + print(f"Access denied: {result['deny_reason']}") -if result.state == ServerState.VERIFIED: - print(f"Server verified! DID: {result.did}") - print(f"Trust badge: {result.badge_jws}") -else: - print(f"Verification failed: {result.state}") +finally: + client.close() ``` --- -## Core Functions +## MCPClient Methods ### `evaluate_tool_access()` -Evaluates whether a tool access request should be allowed based on the server's trust level. - -```python -def evaluate_tool_access( - tool_name: str, - server_endpoint: str, - trust_level: TrustLevel, - required_level: TrustLevel = TrustLevel.REGISTERED, - tool_policy: ToolPolicy | None = None, -) -> ToolAccessResult: - """ - Evaluate tool access based on trust level. - - Args: - tool_name: Name of the tool being accessed - server_endpoint: URL of the requesting MCP server - trust_level: Current trust level of the server - required_level: Minimum trust level required (default: REGISTERED) - tool_policy: Optional custom policy for this tool - - Returns: - ToolAccessResult with allow/deny decision and metadata - """ -``` - -#### Parameters - -| Parameter | Type | Required | Description | -|-----------|------|----------|-------------| -| `tool_name` | `str` | ✅ | Name of the tool being accessed | -| `server_endpoint` | `str` | ✅ | URL of the requesting MCP server | -| `trust_level` | `TrustLevel` | ✅ | Server's current trust level | -| `required_level` | `TrustLevel` | ❌ | Minimum required trust (default: `REGISTERED`) | -| `tool_policy` | `ToolPolicy` | ❌ | Custom access policy for this tool | - -#### Return Value - -`ToolAccessResult` with the following attributes: +Evaluate tool access request (RFC-006 §6.2-6.4). Returns both a decision and evidence record atomically. ```python -@dataclass -class ToolAccessResult: - allow: bool # Whether access is granted - trust_level: TrustLevel # Effective trust level - deny_reason: DenyReason | None # Reason if denied - tool_name: str # Tool that was evaluated - server_endpoint: str # Server that requested access - evaluated_at: datetime # When evaluation occurred +result = client.mcp.evaluate_tool_access( + tool_name="write_file", + params_hash="sha256:...", # Hash of tool parameters for audit + server_origin="https://...", # MCP server origin + badge_jws=badge_token, # Caller's badge (or use api_key) + # api_key="...", # Alternative: API key auth + min_trust_level=2, # Minimum required trust (0-4) + accept_level_zero=False, # Accept self-signed badges? + allowed_tools=["read_file", "write_file"], # Optional allowlist + trusted_issuers=["did:web:capiscio.io"], # Trusted badge issuers +) ``` ---- +**Returns dict with:** + +| Field | Type | Description | +|-------|------|-------------| +| `decision` | `str` | `"allow"` or `"deny"` | +| `deny_reason` | `str` | Reason if denied (e.g., `"trust_insufficient"`) | +| `deny_detail` | `str` | Detailed error message | +| `agent_did` | `str` | DID of authenticated agent | +| `badge_jti` | `str` | Badge JTI if badge was used | +| `auth_level` | `str` | `"anonymous"`, `"api_key"`, or `"badge"` | +| `trust_level` | `int` | Agent's trust level (0-4) | +| `evidence_json` | `str` | RFC-006 §7 evidence record | +| `evidence_id` | `str` | Unique evidence ID | +| `timestamp` | `str` | ISO timestamp | ### `verify_server_identity()` -Verifies an MCP server's identity through its trust badge. +Verify MCP server identity (RFC-007 §7.2). Checks DID + badge and transport origin binding. ```python -def verify_server_identity( - server_endpoint: str, - expected_did: str | None = None, - timeout: float = 10.0, - verify_tls: bool = True, -) -> ServerIdentityResult: - """ - Verify server identity via trust badge. - - Args: - server_endpoint: MCP server URL to verify - expected_did: Optional expected DID for binding verification - timeout: Request timeout in seconds - verify_tls: Whether to verify TLS certificates - - Returns: - ServerIdentityResult with verification state and badge data - """ +result = client.mcp.verify_server_identity( + server_did="did:web:example.com:mcp:server", + server_badge=badge_token, # Server's badge JWT + transport_origin="https://example.com", + endpoint_path="/mcp", + min_trust_level=1, + accept_level_zero=False, + offline_mode=False, # Use cache only? + skip_origin_binding=False, # Skip RFC-007 §5.3 check? +) ``` -#### Parameters - -| Parameter | Type | Required | Description | -|-----------|------|----------|-------------| -| `server_endpoint` | `str` | ✅ | MCP server URL to verify | -| `expected_did` | `str` | ❌ | Expected DID for binding check | -| `timeout` | `float` | ❌ | Request timeout (default: 10.0s) | -| `verify_tls` | `bool` | ❌ | Verify TLS certs (default: `True`) | +**Returns dict with:** -#### Return Value - -`ServerIdentityResult` with the following attributes: - -```python -@dataclass -class ServerIdentityResult: - state: ServerState # Verification result state - server_endpoint: str # Server that was verified - did: str | None # Resolved DID (if verified) - badge_jws: str | None # Raw badge JWS (if retrieved) - badge_payload: dict | None # Decoded badge payload - error: str | None # Error message (if failed) - verified_at: datetime # When verification occurred -``` - ---- +| Field | Type | Description | +|-------|------|-------------| +| `state` | `str` | `"verified_principal"`, `"declared_principal"`, or `"unverified_origin"` | +| `trust_level` | `int` | Server's trust level (0-4) | +| `server_did` | `str` | Verified server DID | +| `badge_jti` | `str` | Server badge JTI | +| `error_code` | `str` | Error code if verification failed | +| `error_detail` | `str` | Detailed error message | ### `parse_server_identity_http()` -Extract server identity from HTTP response headers. +Parse server identity from HTTP headers (RFC-007 §5.2). ```python -def parse_server_identity_http( - headers: dict[str, str] -) -> ServerIdentity | None: - """ - Parse server identity from HTTP headers. - - Looks for: - - X-CapiscIO-Badge: JWS trust badge - - X-CapiscIO-DID: Server DID - - Args: - headers: HTTP response headers - - Returns: - ServerIdentity if badge found, None otherwise - """ -``` - -#### Example - -```python -import httpx -from capiscio_sdk.mcp import parse_server_identity_http - -response = httpx.get("https://mcp-server.example.com/health") -identity = parse_server_identity_http(dict(response.headers)) +result = client.mcp.parse_server_identity_http( + capiscio_server_did="did:web:example.com:server", + capiscio_server_badge="eyJhbGc...", +) -if identity: - print(f"Server DID: {identity.did}") - print(f"Badge issued: {identity.badge_issued_at}") +if result["identity_present"]: + # Verify the extracted identity + verification = client.mcp.verify_server_identity( + server_did=result["server_did"], + server_badge=result["server_badge"], + transport_origin="https://example.com", + ) ``` ---- - -### `parse_server_identity_jsonrpc()` +**Returns dict with:** -Extract server identity from JSON-RPC response metadata. +| Field | Type | Description | +|-------|------|-------------| +| `server_did` | `str` | Extracted server DID | +| `server_badge` | `str` | Extracted server badge | +| `identity_present` | `bool` | Whether identity was found | -```python -def parse_server_identity_jsonrpc( - response: dict -) -> ServerIdentity | None: - """ - Parse server identity from JSON-RPC response. - - Looks for identity in: - - response["_meta"]["capiscio"] - - response["result"]["_meta"]["capiscio"] - - Args: - response: JSON-RPC response dict - - Returns: - ServerIdentity if found, None otherwise - """ -``` +### `parse_server_identity_jsonrpc()` -#### Example +Parse server identity from JSON-RPC _meta (RFC-007 §5.3). For stdio transport. ```python -from capiscio_sdk.mcp import parse_server_identity_jsonrpc +import json -# JSON-RPC response with embedded identity -response = { - "jsonrpc": "2.0", - "id": 1, - "result": {...}, +meta = json.dumps({ "_meta": { - "capiscio": { - "did": "did:web:example.com", - "badge": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9..." - } + "serverDid": "did:web:example.com", + "serverBadge": "eyJhbGc..." } -} +}) -identity = parse_server_identity_jsonrpc(response) -if identity: - print(f"Verified server: {identity.did}") +result = client.mcp.parse_server_identity_jsonrpc(meta_json=meta) ``` ---- - -## Types Reference - -### TrustLevel +### `health()` -Trust levels in ascending order of privilege: +Check MCP service health and version compatibility. ```python -class TrustLevel(str, Enum): - UNKNOWN = "unknown" # No trust information - REGISTERED = "registered" # Basic registration only - VERIFIED = "verified" # Identity verified - AUDITED = "audited" # Security audit passed - CERTIFIED = "certified" # Full certification -``` - -### DenyReason +result = client.mcp.health(client_version="capiscio-sdk-python/1.0.0") -Reasons for access denial: - -```python -class DenyReason(str, Enum): - INSUFFICIENT_TRUST = "insufficient_trust" # Trust level too low - EXPIRED_BADGE = "expired_badge" # Badge has expired - REVOKED_BADGE = "revoked_badge" # Badge was revoked - INVALID_SIGNATURE = "invalid_signature" # Badge signature invalid - POLICY_VIOLATION = "policy_violation" # Tool policy not satisfied - UNKNOWN_SERVER = "unknown_server" # Server not recognized +print(f"Healthy: {result['healthy']}") +print(f"Core version: {result['core_version']}") +print(f"Proto version: {result['proto_version']}") +print(f"Compatible: {result['version_compatible']}") ``` -### ServerState +--- -Server verification states: +## Trust Levels (RFC-002 §5) -```python -class ServerState(str, Enum): - VERIFIED = "verified" # Successfully verified - UNVERIFIED = "unverified" # No badge found - EXPIRED = "expired" # Badge expired - REVOKED = "revoked" # Badge revoked - INVALID = "invalid" # Invalid badge data - UNREACHABLE = "unreachable" # Could not reach server - TIMEOUT = "timeout" # Request timed out - ERROR = "error" # Other error occurred -``` +| Level | Name | Description | +|-------|------|-------------| +| 0 | Self-Signed | Development only, no verification | +| 1 | Registered | Basic registration with registry | +| 2 | Domain-Validated (DV) | Domain ownership verified | +| 3 | Organization-Validated (OV) | Organization identity verified | +| 4 | Extended-Validation (EV) | Full audit and certification | -### ServerIdentity +--- -Server identity information: +## Deny Reasons -```python -@dataclass -class ServerIdentity: - did: str # Server's DID - badge_jws: str # Raw JWS badge - badge_issued_at: datetime # When badge was issued - badge_expires_at: datetime # When badge expires - trust_level: TrustLevel # Server's trust level -``` +| Reason | Description | +|--------|-------------| +| `badge_missing` | No badge or API key provided | +| `badge_invalid` | Badge signature invalid | +| `badge_expired` | Badge has expired | +| `badge_revoked` | Badge was revoked | +| `trust_insufficient` | Trust level below minimum | +| `tool_not_allowed` | Tool not in allowed list | +| `issuer_untrusted` | Badge issuer not trusted | +| `policy_denied` | Custom policy denied access | -### ToolPolicy +--- -Custom tool access policies: +## Server States -```python -@dataclass -class ToolPolicy: - required_trust_level: TrustLevel # Minimum trust required - allowed_dids: list[str] | None # Allowlist of DIDs - blocked_dids: list[str] | None # Blocklist of DIDs - require_badge: bool # Require valid badge -``` +| State | Description | +|-------|-------------| +| `verified_principal` | Server identity fully verified | +| `declared_principal` | DID declared but not fully verified | +| `unverified_origin` | Could not verify transport origin binding | --- -## Integration Patterns - -### FastAPI Middleware +## Example: Full MCP Flow ```python -from fastapi import FastAPI, Request, HTTPException -from capiscio_sdk.mcp import ( - verify_server_identity, - parse_server_identity_http, - ServerState -) +from capiscio_sdk._rpc.client import CapiscioRPCClient -app = FastAPI() - -@app.middleware("http") -async def mcp_security_middleware(request: Request, call_next): - # Check for CapiscIO badge in headers - identity = parse_server_identity_http(dict(request.headers)) +def mcp_tool_handler(request, badge_token: str): + """Handle an MCP tool call with security validation.""" + + client = CapiscioRPCClient() + client.connect() - if identity: - # Verify the badge - result = verify_server_identity( - server_endpoint=str(request.client.host), - expected_did=identity.did + try: + # 1. Evaluate tool access + access = client.mcp.evaluate_tool_access( + tool_name=request.tool_name, + params_hash=hash_params(request.params), + server_origin=request.origin, + badge_jws=badge_token, + min_trust_level=2, # Require DV or higher ) - if result.state != ServerState.VERIFIED: - raise HTTPException( - status_code=403, - detail=f"Server verification failed: {result.state}" - ) + if access["decision"] != "allow": + return {"error": access["deny_detail"]} - # Attach verified identity to request state - request.state.mcp_identity = identity - - return await call_next(request) -``` - -### Tool Decorator - -```python -from functools import wraps -from capiscio_sdk.mcp import evaluate_tool_access, TrustLevel - -def require_trust(level: TrustLevel): - """Decorator to require minimum trust level for tool access.""" - def decorator(func): - @wraps(func) - def wrapper(tool_name: str, server_endpoint: str, trust_level: TrustLevel, *args, **kwargs): - result = evaluate_tool_access( - tool_name=tool_name, - server_endpoint=server_endpoint, - trust_level=trust_level, - required_level=level - ) - - if not result.allow: - raise PermissionError( - f"Tool '{tool_name}' requires {level.value} trust, " - f"but server has {trust_level.value}: {result.deny_reason}" - ) - - return func(tool_name, server_endpoint, trust_level, *args, **kwargs) - return wrapper - return decorator - -# Usage -@require_trust(TrustLevel.VERIFIED) -def sensitive_tool(tool_name: str, server_endpoint: str, trust_level: TrustLevel): - """This tool requires VERIFIED trust level.""" - return {"result": "sensitive operation completed"} -``` - -### Async Verification - -```python -import asyncio -from capiscio_sdk.mcp import verify_server_identity, ServerState - -async def verify_servers(endpoints: list[str]) -> dict[str, ServerState]: - """Verify multiple servers concurrently.""" - - async def verify_one(endpoint: str) -> tuple[str, ServerState]: - # verify_server_identity is sync, run in executor - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - verify_server_identity, - endpoint - ) - return endpoint, result.state - - tasks = [verify_one(ep) for ep in endpoints] - results = await asyncio.gather(*tasks) - - return dict(results) - -# Usage -async def main(): - servers = [ - "https://server1.example.com", - "https://server2.example.com", - "https://server3.example.com", - ] - - states = await verify_servers(servers) - for server, state in states.items(): - print(f"{server}: {state}") + # 2. Execute tool (access granted) + result = execute_tool(request.tool_name, request.params) + + # 3. Return with evidence ID for audit trail + return { + "result": result, + "_evidence_id": access["evidence_id"], + } + + finally: + client.close() ``` --- -## Best Practices - -### 1. Cache Verification Results - -Server identity verification involves network calls. Cache results appropriately: +## Related Documentation -```python -from functools import lru_cache -from datetime import datetime, timedelta - -@lru_cache(maxsize=100) -def cached_verify(server_endpoint: str, cache_key: str) -> ServerIdentityResult: - """Cache verification for 5 minutes using time-based cache key.""" - return verify_server_identity(server_endpoint) - -def verify_with_cache(server_endpoint: str) -> ServerIdentityResult: - # Generate cache key that expires every 5 minutes - cache_key = datetime.now().strftime("%Y%m%d%H%M")[:-1] # Truncate to 5min - return cached_verify(server_endpoint, cache_key) -``` +- [Badge Verification Guide](badge-verification.md) - Trust badge operations +- [Configuration Guide](configuration.md) - SDK configuration +- [API Reference](../api-reference.md#mcpclient-rfc-006--rfc-007) - Full MCPClient API -### 2. Fail Secure +## High-Level Alternative -Default to denying access when verification fails: +For high-level MCP integration with decorators and middleware, use **capiscio-mcp-python**: -```python -def secure_tool_access(tool_name: str, server_endpoint: str) -> bool: - try: - result = verify_server_identity(server_endpoint) - return result.state == ServerState.VERIFIED - except Exception as e: - # Log the error but default to DENY - logger.error(f"Verification failed for {server_endpoint}: {e}") - return False +```bash +pip install capiscio-mcp ``` -### 3. Use Tool Policies for Sensitive Operations - ```python -# Define policies for sensitive tools -TOOL_POLICIES = { - "file_write": ToolPolicy( - required_trust_level=TrustLevel.AUDITED, - require_badge=True - ), - "execute_code": ToolPolicy( - required_trust_level=TrustLevel.CERTIFIED, - allowed_dids=["did:web:trusted-partner.com"], - require_badge=True - ), - "read_config": ToolPolicy( - required_trust_level=TrustLevel.VERIFIED, - require_badge=True - ), -} - -def evaluate_with_policy(tool_name: str, server_endpoint: str, trust_level: TrustLevel): - policy = TOOL_POLICIES.get(tool_name) - return evaluate_tool_access( - tool_name=tool_name, - server_endpoint=server_endpoint, - trust_level=trust_level, - tool_policy=policy - ) -``` - ---- +from capiscio_mcp import guard -## Related Documentation - -- [Badge Verification Guide](badge-verification.md) - Core badge verification patterns -- [Configuration Guide](configuration.md) - SDK configuration options -- [Scoring System](scoring.md) - Trust scoring methodology - -## RFC References +@guard(min_trust_level=2) +async def my_tool(param: str) -> str: + """This tool requires Trust Level 2+.""" + return f"Result: {param}" +``` -- [RFC-006: MCP Tool Access Evaluation](/rfcs/rfc-006/) -- [RFC-007: MCP Server Identity Verification](/rfcs/rfc-007/) +See [capiscio-mcp documentation](https://docs.capiscio.io/mcp-python/) for details. diff --git a/mkdocs.yml b/mkdocs.yml index 6ad09ad..8fe4c55 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -64,7 +64,7 @@ nav: - Core Concepts: getting-started/concepts.md - Guides: - Badge Verification: guides/badge-verification.md - - MCP Security: guides/mcp.md + - MCP Service (gRPC): guides/mcp.md - Scoring System: guides/scoring.md - Configuration: guides/configuration.md - API Reference: api-reference.md