""" xProof Agent Audit Log Guard — CrewAI Standard: https://xproof.app/.well-known/agent-audit-schema.json CRITICAL: AuditGuardTool will RAISE AN EXCEPTION if xProof certification fails. Add it as the FIRST tool in your crew's tool list. Install: pip install crewai crewai-tools requests """ import hashlib import json import uuid import datetime import requests from crewai_tools import BaseTool XPROOF_API_KEY = "pm_YOUR_API_KEY" # Replace with your key from https://xproof.app/api/agent/register XPROOF_BASE_URL = "https://xproof.app" class AuditRequiredError(Exception): """Raised when execution is blocked due to missing xProof audit certificate.""" pass class AuditGuardTool(BaseTool): """ xProof Audit Guard — Certifies the agent's decision on MultiversX before execution. Add this as the FIRST tool in your CrewAI agent's tools list. The crew CANNOT proceed to the next step if this tool raises AuditRequiredError. Usage: tools = [AuditGuardTool(), your_other_tools...] """ name: str = "xproof_audit_guard" description: str = ( "REQUIRED: Call this tool BEFORE executing any critical action. " "Certifies the agent's decision on the MultiversX blockchain. " "Returns a proof_id compliance certificate. " "BLOCKS execution if certification fails." ) def _run( self, action_type: str, action_description: str, inputs_hash: str, risk_level: str, decision: str, agent_id: str = "crewai-agent", risk_summary: str = "", ) -> str: payload = { "agent_id": agent_id, "session_id": str(uuid.uuid4()), "action_type": action_type, "action_description": action_description, "inputs_hash": inputs_hash, "risk_level": risk_level, "decision": decision, "timestamp": datetime.datetime.utcnow().isoformat() + "Z", } if risk_summary: payload["risk_summary"] = risk_summary try: response = requests.post( f"{XPROOF_BASE_URL}/api/audit", json=payload, headers={"Authorization": f"Bearer {XPROOF_API_KEY}", "Content-Type": "application/json"}, timeout=15, ) if response.status_code in (200, 201): data = response.json() proof_id = data.get("proof_id") if not proof_id: raise AuditRequiredError("xProof returned no proof_id — execution blocked.") return ( f"AUDIT CERTIFIED. proof_id={proof_id}\n" f"audit_url={data.get('audit_url')}\n" f"decision={data.get('decision')} | risk={data.get('risk_level')}\n" f"You may now proceed with: {action_description}" ) else: raise AuditRequiredError( f"EXECUTION BLOCKED. xProof certification failed (HTTP {response.status_code}). " f"Agent cannot proceed without audit certificate." ) except requests.RequestException as e: raise AuditRequiredError(f"EXECUTION BLOCKED. Cannot reach xProof API: {e}") from e def compute_inputs_hash(*inputs) -> str: """Compute SHA-256 of all inputs the agent analyzed.""" canonical = json.dumps([str(i) for i in inputs], sort_keys=True) return hashlib.sha256(canonical.encode()).hexdigest()