Skip to Content
SDK Guide

AgentTrust ID SDK Guide

Comprehensive multi-language guide for integrating AgentTrust ID into your AI agents across Python, TypeScript, Go, Java, and Rust.


Table of Contents

  1. Installation
  2. Client Initialization
  3. Agent Registration
  4. Token Issuance & Introspection
  5. Action Checking
  6. Session Management (AgentTrust ID 1.0)
  7. Guard Pattern
  8. Elevation Workflow (AgentTrust ID 1.0)
  9. Approval Management
  10. MCP Proxy
  11. A2A Delegation
  12. Error Handling
  13. Data Privacy

Current SDK Coverage

AgentTrust ID does not currently ship full feature parity across all five SDKs.

Capability AreaPythonTypeScriptGoJavaRust
Core agents, tokens, actions, telemetry, guardYesYesYesYesYes
MCP session init/getYesYesYesYesYes
Approval workflowYesYesYesYesYes
MCP registry / proxyYesYesNoNoNo
A2A task dispatchYesYesNoNoNo
Delegation CRUDYesYesYesNoNo
Delegated session bridgeYesYesYesNoNo
API token -> AgentTrust ID session bridgeYesYesYesNoNo
Federation provider / token / session APIsYesYesYesNoNo
SIEM streaming destination APIsYesYesYesNoNo

Use Go, Python, or TypeScript when you need the currently wired beta surface. Java and Rust remain focused on the core agent, token, action, session, and approval flows.


1. Installation

Python

pip install agenttrustid

With LangChain support:

pip install agenttrustid[langchain]

TypeScript

npm install @agenttrustid/sdk

Go

go get github.com/agenttrustid/sdk/go

Java (Maven)

<dependency> <groupId>id.agenttrust</groupId> <artifactId>agenttrustid</artifactId> <version>0.3.0</version> </dependency>

Java (Gradle)

implementation 'id.agenttrust:agenttrustid:0.3.0'

Rust

cargo add agenttrustid

2. Client Initialization

All SDKs support two initialization methods: explicit configuration and environment variables.

Environment Variables

All SDKs read the same environment variables:

export AGENTTRUST_URL=http://localhost:8080 # Gateway URL (or AGENTTRUST_BASE_URL) export AGENTTRUST_API_KEY=sk_live_... # Organization API key

Get an API Key

  1. Log into the AgentTrust ID dashboard at http://localhost:3002
  2. Go to Settings > API Keys > Create Key
  3. Copy the key (sk_live_...) — it is only shown once

Constructor Examples

Python

from agenttrustid import AgentTrustClient # From environment variables client = AgentTrustClient.from_env() # Explicit configuration client = AgentTrustClient( base_url="http://localhost:8080", api_key="sk_live_...", )

TypeScript

import { AgentTrustClient } from '@agenttrustid/sdk'; // From environment variables const client = AgentTrustClient.fromEnv(); // Explicit configuration const client = new AgentTrustClient({ baseUrl: 'http://localhost:8080', apiKey: 'sk_live_...', });

Go

import ati "github.com/agenttrustid/sdk/go" // From environment variables client := ati.FromEnv() // Explicit configuration client := ati.NewClient( ati.WithBaseURL("http://localhost:8080"), ati.WithAPIKey("sk_live_..."), )

Java

import id.agenttrust.sdk.AgentTrustClient; // From environment variables AgentTrustClient client = AgentTrustClient.fromEnv(); // Builder pattern AgentTrustClient client = AgentTrustClient.builder() .baseUrl("http://localhost:8080") .apiKey("sk_live_...") .build();

Rust

use agenttrust_sdk::AgentTrustClient; // From environment variables let client = AgentTrustClient::from_env()?; // Builder pattern let client = AgentTrustClient::builder() .base_url("http://localhost:8080") .api_key("sk_live_...") .build()?;

3. Agent Registration

When you create an agent, AgentTrust ID creates an org-scoped agent record with framework, capability, metadata, and public-key fields. The current create path returns the agent record; it does not return a private key or certificate object.

Python

agent = client.agents.create( name="research-bot", framework="langchain", capabilities=["web:search", "files:read"], ) print(f"Agent ID: {agent.id}") print(f"Status: {agent.status}")

TypeScript

const agent = await client.agents.create({ name: 'research-bot', framework: 'langchain', capabilities: ['web:search', 'files:read'], }); console.log(`Agent ID: ${agent.id}`); console.log(`Status: ${agent.status}`);

Go

import "fmt" agent, err := client.Agents.Create(ctx, ati.CreateAgentRequest{ Name: "research-bot", Framework: "langchain", Capabilities: []string{"web:search", "files:read"}, }) if err != nil { log.Fatal(err) } fmt.Printf("Agent ID: %s\n", agent.ID)

Java

import id.agenttrust.sdk.models.Agent; import id.agenttrust.sdk.models.CreateAgentRequest; import java.util.List; Agent agent = client.agents().create( CreateAgentRequest.builder() .name("research-bot") .framework("langchain") .capabilities(List.of("web:search", "files:read")) .build() ); System.out.println("Agent ID: " + agent.getId());

Rust

let agent = client.agents().create(&agenttrust::CreateAgentRequest { name: "research-bot".to_string(), framework: "langchain".to_string(), capabilities: vec!["web:search".to_string(), "files:read".to_string()], ..Default::default() })?; println!("Agent ID: {}", agent.id);

Listing and Revoking Agents

Python

# List all agents agents = client.agents.list() for a in agents: print(f"{a.name} ({a.status})") # Revoke an agent (immediate, cannot be undone) client.agents.revoke(agent.id, reason="compromised")

TypeScript

// List all agents const agents = await client.agents.list(); agents.forEach(a => console.log(`${a.name} (${a.status})`)); // Revoke an agent await client.agents.revoke(agent.id, 'compromised');

4. Token Issuance & Introspection

Issue short-lived opaque capability tokens for agents to access specific resources. Tokens are cached by default to reduce latency and must be introspected server-side; they are not JWTs and cannot be validated client-side.

Python

# Issue a token token = client.tokens.issue( agent_id=agent.id, scopes=["files:read"], ttl_seconds=300, ) print(f"Token: {token.token}") # Introspect a token result = client.tokens.introspect( token=token.token, target="mcp://filesystem", ) print(f"Active: {result.active}")

TypeScript

// Issue a token const token = await client.tokens.issue({ agentId: agent.id, scopes: ['files:read'], ttlSeconds: 300, }); console.log(`Token: ${token.token}`); // Introspect a token const result = await client.tokens.introspect({ token: token.token, target: 'mcp://filesystem', }); console.log(`Active: ${result.active}`);

Go

// Issue a token token, err := client.Tokens.Issue(ctx, ati.IssueTokenRequest{ AgentID: agent.ID, Scope: []string{"files:read"}, TTL: 300, }) if err != nil { log.Fatal(err) } fmt.Printf("Token: %s\n", token.Token) // Introspect a token result, err := client.Tokens.Introspect(ctx, ati.IntrospectTokenRequest{ Token: token.Token, Target: "mcp://filesystem", }) if err != nil { log.Fatal(err) } fmt.Printf("Active: %v\n", result.Active)

Java

// Issue a token Token token = client.tokens().issue( IssueTokenRequest.builder() .agentId(agent.getId()) .scope(List.of("files:read")) .ttl(300) .build() ); System.out.println("Token: " + token.getToken()); // Introspect a token IntrospectionResult result = client.tokens().introspect( IntrospectTokenRequest.builder() .token(token.getToken()) .target("mcp://filesystem") .build() ); System.out.println("Active: " + result.isActive());

Rust

// Issue a token let token = client.tokens().issue(&agenttrust::IssueTokenRequest { agent_id: agent.id.clone(), scope: vec!["files:read".to_string()], audience: vec![], ttl: 300, })?; println!("Token: {}", token.token); // Introspect a token let result = client.tokens().introspect(&agenttrust::IntrospectTokenRequest { token: token.token.clone(), target: Some("mcp://filesystem".to_string()), required_scopes: vec![], })?; println!("Active: {}", result.active);

5. Action Checking

Pre-flight authorization check before every tool call. Uses Fast Guard rules optimized for low latency (typical, not guaranteed; varies by deployment). The action_effect parameter hints to Guardian how to classify the action: read, mutating, destructive, or admin. If omitted, the backend auto-classifies based on the tool name.

Python

result = client.actions.check( agent_id=agent.id, action="tool_call", tool_name="web_search", tool_input_summary="AI safety papers", session_id="sess-abc", action_effect="read", ) print(f"Allowed: {result.allowed}, Confidence: {result.confidence}") # If elevation is required if result.elevation_required: print(f"Approval needed: {result.approval_id}")

TypeScript

const result = await client.actions.check({ agentId: agent.id, toolName: 'web_search', toolInputSummary: 'AI safety papers', sessionId: 'sess-abc', actionEffect: 'read', }); console.log(`Allowed: ${result.allowed}, Confidence: ${result.confidence}`); // If elevation is required if (result.elevationRequired) { console.log(`Approval needed: ${result.approvalId}`); }

Go

result, err := client.Actions.Check(ctx, ati.ActionCheckRequest{ AgentID: agent.ID, Action: "tool_call", ToolName: "web_search", ToolInputSummary: "AI safety papers", SessionID: "sess-abc", ActionEffect: "read", }) if err != nil { log.Fatal(err) } fmt.Printf("Allowed: %v, Confidence: %v\n", result.Allowed, result.Confidence) if result.ElevationRequired { fmt.Printf("Approval needed: %s\n", result.ApprovalID) }

Java

ActionCheckResult result = client.actions().check( ActionsAPI.ActionCheckRequest.builder() .agentId(agent.getId()) .action("tool_call") .toolName("web_search") .toolInputSummary("AI safety papers") .sessionId("sess-abc") .actionEffect("read") .build() ); System.out.println("Allowed: " + result.isAllowed()); if (result.isElevationRequired()) { System.out.println("Approval needed: " + result.getApprovalId()); }

Rust

let result = client.actions().check(&agenttrust::ActionCheckRequest { agent_id: agent.id.clone(), action: "tool_call".to_string(), tool_name: "web_search".to_string(), tool_input_summary: "AI safety papers".to_string(), session_id: "sess-abc".to_string(), action_effect: Some("read".to_string()), })?; println!("Allowed: {}, Confidence: {:?}", result.allowed, result.confidence); if result.elevation_required == Some(true) { println!("Approval needed: {:?}", result.approval_id); }

6. Session Management (AgentTrust ID 1.0)

Sessions track agent-to-server connections with call metrics, scope ceilings, and session modes. Initialize a session when an agent connects to an MCP server, then reference the session ID in all subsequent action checks.

Session Modes

ModeDescription
openAgent can call any tool within its capabilities
restrictedAgent is limited to the scope ceiling set at session init
supervisedAll calls are logged; destructive actions require elevation

Init and Get Session

Python

# Initialize a session session = client.sessions.init_session( agent_id=agent.id, server_id="mcp-code-tools", ) print(f"Session ID: {session.session_id}") print(f"Mode: {session.mode}") print(f"Scope ceiling: {session.scope_ceiling}") # Retrieve session state later session = client.sessions.get_session(session.session_id) print(f"Total calls: {session.total_calls}, Denied: {session.denied_calls}")

TypeScript

// Initialize a session const session = await client.sessions.initSession(agent.id, 'mcp-code-tools'); console.log(`Session ID: ${session.sessionId}`); console.log(`Mode: ${session.mode}`); console.log(`Scope ceiling: ${session.scopeCeiling}`); // Retrieve session state later const current = await client.sessions.getSession(session.sessionId); console.log(`Total calls: ${current.totalCalls}, Denied: ${current.deniedCalls}`);

Go

// Initialize a session session, err := client.Sessions.InitSession(ctx, ati.InitSessionRequest{ AgentID: agent.ID, ServerID: "mcp-code-tools", }) if err != nil { log.Fatal(err) } fmt.Printf("Session ID: %s, Mode: %s\n", session.SessionID, session.Mode) // Retrieve session state later current, err := client.Sessions.GetSession(ctx, session.SessionID) if err != nil { log.Fatal(err) } fmt.Printf("Total calls: %d, Denied: %d\n", current.TotalCalls, current.DeniedCalls)

Java

// Initialize a session Session session = client.sessions().initSession(agent.getId(), "mcp-code-tools"); System.out.println("Session ID: " + session.getSessionId()); System.out.println("Mode: " + session.getMode()); // Retrieve session state later Session current = client.sessions().getSession(session.getSessionId()); System.out.println("Total calls: " + current.getTotalCalls());

Rust

// Initialize a session let session = client.sessions().init_session(&agent.id, "mcp-code-tools")?; println!("Session ID: {}", session.session_id); println!("Mode: {}", session.mode); println!("Scope ceiling: {:?}", session.scope_ceiling); // Retrieve session state later let current = client.sessions().get_session(&session.session_id)?; println!("Total calls: {}, Denied: {}", current.total_calls, current.denied_calls);

7. Guard Pattern

The Guard wraps action-check and telemetry into a simple check-then-report pattern. It is the recommended way to integrate AgentTrust ID into agents that use raw OpenAI, Anthropic, or other SDKs directly (not framework callbacks).

Guard Lifecycle

  1. Create a guard with an agent ID and options
  2. Call check(tool, summary, effect) before each tool call
  3. Execute the tool
  4. Call report(tool, success, duration_ms) after each tool call
  5. Call flush()/close() when the agent is done

Guard Options

OptionDefaultDescription
session_idauto-generated UUIDSession ID for telemetry correlation
block_on_denytrueRaise/return an error when Guardian denies an action
fail_openfalseIf Guardian is unreachable: false = block, true = allow

Full Guard Workflow

Python

from agenttrustid import AgentTrustClient from agenttrustid.guard import ATIGuard from agenttrustid.exceptions import AgentTrustError client = AgentTrustClient.from_env() # Use as context manager for automatic cleanup with ATIGuard(client, agent_id=agent.id, fail_open=False) as guard: # Before tool call -- raises AgentTrustError if denied guard.check("web_search", input_summary="AI news", action_effect="read") # Execute the tool start = time.time() result = do_web_search("AI news") duration_ms = int((time.time() - start) * 1000) # After tool call -- fire-and-forget telemetry guard.report("web_search", success=True, duration_ms=duration_ms) # guard.close() is called automatically by the context manager

TypeScript

import { AgentTrustClient, AgentTrustGuard } from '@agenttrustid/sdk'; const client = AgentTrustClient.fromEnv(); const guard = new AgentTrustGuard(client, agent.id, { failOpen: false, blockOnDeny: true, }); try { // Before tool call -- throws AgentTrustError if denied await guard.check('web_search', 'AI news', 'read'); // Execute the tool const start = Date.now(); const result = await doWebSearch('AI news'); const durationMs = Date.now() - start; // After tool call -- fire-and-forget telemetry guard.report('web_search', true, durationMs); } finally { // Flush remaining telemetry await guard.flush(); }

Go

import ati "github.com/agenttrustid/sdk/go" client := ati.FromEnv() guard := ati.NewGuard(client, agent.ID, ati.WithFailOpen(false), ati.WithBlockOnDeny(true), ati.WithActionEffect("read"), // default effect for all checks ) defer guard.Close(ctx) // Before tool call -- returns error if denied if err := guard.Check(ctx, "web_search", "AI news"); err != nil { log.Printf("Denied: %v", err) return } // Execute the tool start := time.Now() result, err := doWebSearch("AI news") durationMs := int(time.Since(start).Milliseconds()) // After tool call -- fire-and-forget telemetry guard.Report("web_search", err == nil, durationMs)

Java

import id.agenttrust.sdk.AgentTrustClient; import id.agenttrust.sdk.AgentTrustGuard; AgentTrustClient client = AgentTrustClient.fromEnv(); // Use try-with-resources for automatic cleanup try (AgentTrustGuard guard = new AgentTrustGuard(client, agent.getId())) { // Before tool call -- throws AgentTrustException if denied guard.check("web_search", "AI news", "read"); // Execute the tool long start = System.currentTimeMillis(); Object result = doWebSearch("AI news"); int durationMs = (int) (System.currentTimeMillis() - start); // After tool call -- fire-and-forget telemetry guard.report("web_search", true, durationMs); } // guard.close() is called automatically

Rust

use agenttrust_sdk::{AgentTrustClient, AgentTrustGuard}; let client = AgentTrustClient::from_env()?; let guard = AgentTrustGuard::builder(client, &agent.id) .fail_open(false) .block_on_deny(true) .action_effect("read") // default effect for all checks .build(); // Before tool call -- returns Err if denied guard.check("web_search", "AI news")?; // Or with an explicit per-call effect (overrides default) guard.check_with_effect("db_delete", "drop table users", "destructive")?; // Execute the tool let start = std::time::Instant::now(); let result = do_web_search("AI news"); let duration_ms = start.elapsed().as_millis() as u64; // After tool call -- fire-and-forget telemetry guard.report("web_search", true, duration_ms); // Flush remaining telemetry (also happens automatically on Drop) guard.flush()?;

8. Elevation Workflow (AgentTrust ID 1.0)

When an agent tries to perform a high-impact action (destructive, admin), Guardian may require elevated approval. The flow is:

  1. guard.check() returns an elevation error with an approval_id
  2. A human or automated system approves or denies via client.approvals.approve()
  3. guard.check() is retried — this time it succeeds (within the elevation window)

End-to-End Elevation Example

Python

from agenttrustid.exceptions import AgentTrustError try: guard.check("db_delete", "drop table users", action_effect="destructive") except AgentTrustError as e: if e.code == "ELEVATION_REQUIRED": approval_id = e.details["approval_id"] print(f"Elevation required. Approval ID: {approval_id}") # A human or automated system approves the action client.approvals.approve(approval_id, decided_by="admin@company.com") # Retry the check -- now it succeeds within the elevation window guard.check("db_delete", "drop table users", action_effect="destructive") else: raise # Re-raise ACTION_DENIED or other errors

TypeScript

import { AgentTrustError } from '@agenttrustid/sdk'; try { await guard.check('db_delete', 'drop table users', 'destructive'); } catch (error) { if (error instanceof AgentTrustError && error.code === 'ELEVATION_REQUIRED') { const approvalId = error.details?.approvalId; console.log(`Elevation required. Approval ID: ${approvalId}`); // A human or automated system approves the action await client.approvals.approve(approvalId, 'admin@company.com'); // Retry the check await guard.check('db_delete', 'drop table users', 'destructive'); } else { throw error; } }

Go

import "errors" err := guard.Check(ctx, "db_delete", "drop table users") if err != nil { var elevErr *ati.ElevationRequiredError if errors.As(err, &elevErr) { fmt.Printf("Elevation required. Approval ID: %s\n", elevErr.ApprovalID) // A human or automated system approves the action if err := client.Approvals.Approve(ctx, elevErr.ApprovalID, "admin@company.com"); err != nil { log.Fatal(err) } // Retry the check if err := guard.Check(ctx, "db_delete", "drop table users"); err != nil { log.Fatal(err) } } else { log.Fatal(err) } }

Java

import id.agenttrust.sdk.exceptions.ElevationRequiredException; import id.agenttrust.sdk.exceptions.AgentTrustException; try { guard.check("db_delete", "drop table users", "destructive"); } catch (ElevationRequiredException e) { String approvalId = e.getApprovalId(); System.out.println("Elevation required. Approval ID: " + approvalId); // A human or automated system approves the action client.approvals().approve(approvalId, "admin@company.com"); // Retry the check guard.check("db_delete", "drop table users", "destructive"); } catch (AgentTrustException e) { // Handle ACTION_DENIED or other errors throw e; }

Rust

use agenttrust_sdk::AgentTrustError; match guard.check_with_effect("db_delete", "drop table users", "destructive") { Ok(()) => { // Proceed with the action } Err(AgentTrustError::ElevationRequired { approval_id, message }) => { println!("Elevation required: {}. Approval ID: {}", message, approval_id); // A human or automated system approves the action client.approvals().approve(&approval_id, "admin@company.com")?; // Retry the check guard.check_with_effect("db_delete", "drop table users", "destructive")?; } Err(e) => return Err(e), }

9. Approval Management

Manage elevation approval requests programmatically. Approvals are created automatically by Guardian when an action requires elevation. They expire after a configurable window (default: 5 minutes).

Approve, Deny, and Get

Python

# Approve an elevation request client.approvals.approve(approval_id, decided_by="admin@company.com") # Deny an elevation request client.approvals.deny(approval_id, decided_by="security@company.com") # Check approval status approval = client.approvals.get(approval_id) print(f"Status: {approval.status}") # pending | approved | denied | expired print(f"Action: {approval.action_name}, Effect: {approval.action_effect}") print(f"Decided by: {approval.decided_by}")

TypeScript

// Approve an elevation request await client.approvals.approve(approvalId, 'admin@company.com'); // Deny an elevation request await client.approvals.deny(approvalId, 'security@company.com'); // Check approval status const approval = await client.approvals.get(approvalId); console.log(`Status: ${approval.status}`); console.log(`Action: ${approval.actionName}, Effect: ${approval.actionEffect}`); console.log(`Decided by: ${approval.decidedBy}`);

Go

// Approve an elevation request err := client.Approvals.Approve(ctx, approvalID, "admin@company.com") // Deny an elevation request err := client.Approvals.Deny(ctx, approvalID, "security@company.com") // Check approval status approval, err := client.Approvals.Get(ctx, approvalID) if err != nil { log.Fatal(err) } fmt.Printf("Status: %s, Action: %s\n", approval.Status, approval.ActionName)

Java

// Approve an elevation request client.approvals().approve(approvalId, "admin@company.com"); // Deny an elevation request client.approvals().deny(approvalId, "security@company.com"); // Check approval status ApprovalRequest approval = client.approvals().get(approvalId); System.out.println("Status: " + approval.getStatus()); System.out.println("Action: " + approval.getActionName());

Rust

// Approve an elevation request client.approvals().approve(&approval_id, "admin@company.com")?; // Deny an elevation request client.approvals().deny(&approval_id, "security@company.com")?; // Check approval status let approval = client.approvals().get(&approval_id)?; println!("Status: {}, Action: {}", approval.status, approval.action_name);

10. MCP Proxy

Register external MCP tool servers and call tools through AgentTrust ID’s security proxy. Every tools/call is Guardian-checked before forwarding to the upstream server. The session_id parameter correlates calls with an AgentTrust ID session.

MCP proxy integration is available in the Python and TypeScript SDKs. Go, Java, and Rust SDKs use the action-check API directly for MCP-related authorization.

Python

# Register an MCP tool server server = client.mcp.register_server( name="code-tools", url="https://mcp-server.example.com", capabilities=["run_code", "lint", "format"], ) print(f"Server ID: {server.id}") # Call a tool through the proxy (Guardian checks apply) result = client.mcp.call_tool( server_id=server.id, method="tools/call", params={ "name": "run_code", "arguments": {"language": "python", "code": "print('hello')"}, }, session_id=session.session_id, # correlate with AgentTrust ID session ) print(result) # List registered servers servers = client.mcp.list_servers() # Remove a server client.mcp.remove_server(server.id)

TypeScript

// Register an MCP server const server = await client.mcp.registerServer({ name: 'code-tools', url: 'https://mcp-server.example.com', capabilities: ['run_code', 'lint', 'format'], }); // Call a tool through the proxy with session correlation const result = await client.mcp.callTool( server.id, 'tools/call', { name: 'run_code', arguments: { language: 'python', code: "print('hello')" } }, session.sessionId // correlate with AgentTrust ID session ); // List servers const servers = await client.mcp.listServers(); // Remove a server await client.mcp.removeServer(server.id);

11. A2A Delegation

Agent Cards

Generate, publish, and retrieve A2A-compliant Agent Cards. Published cards are available at /a2a/agents/{id}/agent.json.

Python

# Generate an Agent Card card = client.agent_cards.generate(agent_id=agent.id) print(f"{card.name}: {card.capabilities}") # Publish the card (makes it publicly discoverable) client.agent_cards.publish(agent_id=agent.id) # Retrieve a public Agent Card (no auth required) public_card = client.agent_cards.get_public(agent_id=agent.id)

TypeScript

// Generate an Agent Card const card = await client.agentCards.generate(agent.id); // Publish the card await client.agentCards.publish(agent.id); // Retrieve a public card const publicCard = await client.agentCards.getPublic(agent.id);

A2A Task Dispatch

Send work to another agent using the A2A v1.0 message/send method. The v0.3 send_task / sendTask methods remain available for back-compatibility.

Python

# A2A v1.0: send a message to a target agent task = client.a2a.send_message( agent_b.id, "Search for recent AI safety papers and summarize the top 3", ) print(f"Task {task.id}") # Poll for completion, then cancel if needed status = client.a2a.get_task(task.id) client.a2a.cancel_task(task.id) # Back-compat (A2A v0.3): # client.a2a.send_task(source_agent_id=..., target_agent_id=..., message=...)

TypeScript

// A2A v1.0: send a message to a target agent const task = await client.a2a.sendMessage(agentB.id, { text: 'Search for recent AI safety papers', }); const status = await client.a2a.getTask(task.id); await client.a2a.cancelTask(task.id); // Back-compat (A2A v0.3): // client.a2a.sendTask({ sourceAgentId, targetAgentId, message })

Delegation Chains

Delegate capabilities from one agent to another with scope narrowing. Delegations are time-bounded, revocable, and audited.

Python

# Delegate web_search capability for 1 hour delegation = client.delegations.create( from_agent_id=agent_a.id, to_agent_id=agent_b.id, scope=["web_search"], ttl_seconds=3600, ) print(f"Delegation {delegation.id} expires at {delegation.expires_at}") # List delegations delegations = client.delegations.list() # Revoke a delegation client.delegations.revoke(delegation.id)

TypeScript

// Create a delegation const delegation = await client.delegations.create({ fromAgentId: agentA.id, toAgentId: agentB.id, scope: ['web_search'], ttlSeconds: 3600, }); // List delegations const delegations = await client.delegations.list(); // Revoke a delegation await client.delegations.revoke(delegation.id);

Delegated Session Bridge

Use a delegation to bootstrap a local AgentTrust ID session before making mediated calls.

Python

session = client.delegations.init_session(delegation.id) print(session.session_id, session.delegation_id, session.scope_ceiling)

TypeScript

const session = await client.delegations.initSession(delegation.id); console.log(session.sessionId, session.delegationId, session.scopeCeiling);

Go

session, err := client.Delegations.InitSession(ctx, delegation.ID) if err != nil { log.Fatal(err) } fmt.Println(session.SessionID, session.DelegationID, session.ScopeCeiling)

API Session Bridge

Bridge an already-issued AgentTrust ID API token into a local AgentTrust ID session.

Python

session = client.sessions.init_api_session(token.token) print(session.session_id, session.source)

TypeScript

const session = await client.sessions.initApiSession(token.token); console.log(session.sessionId, session.source);

Go

session, err := client.Sessions.InitAPISession(ctx, ati.InitAPISessionRequest{ Token: token.Token, }) if err != nil { log.Fatal(err) }

Federation

Register OIDC providers, verify external ID tokens, and bridge them into local AgentTrust ID sessions.

Python

provider = client.federation.register_provider( issuer="https://issuer.example.com", name="Example Workforce", trust_level="high", ) result = client.federation.verify_token(token="eyJ...") session = client.federation.init_session(token="eyJ...")

TypeScript

const provider = await client.federation.registerProvider({ issuer: 'https://issuer.example.com', name: 'Example Workforce', trustLevel: 'high', }); const result = await client.federation.verifyToken({ token: 'eyJ...' }); const session = await client.federation.initSession({ token: 'eyJ...' });

Go

provider, err := client.Federation.RegisterProvider(ctx, ati.RegisterProviderRequest{ Issuer: "https://issuer.example.com", Name: "Example Workforce", TrustLevel: "high", }) if err != nil { log.Fatal(err) } _, _ = client.Federation.VerifyToken(ctx, ati.VerifyFederatedTokenRequest{Token: "eyJ..."}) _, _ = client.Federation.InitSession(ctx, ati.InitFederatedSessionRequest{Token: "eyJ..."}) _ = provider

SIEM Streaming

Configure audit-event delivery to external security platforms.

Python

destination = client.streaming.create( name="Splunk HEC", destination_type="splunk", endpoint_url="https://splunk.example.com/services/collector", auth_token="secret", ) logs = client.streaming.delivery_log(destination.id) client.streaming.test(destination.id)

TypeScript

const destination = await client.streaming.create({ name: 'Splunk HEC', destinationType: 'splunk', endpointUrl: 'https://splunk.example.com/services/collector', authToken: 'secret', }); const logs = await client.streaming.deliveryLog(destination.id); await client.streaming.test(destination.id);

Go

destination, err := client.Streaming.Create(ctx, ati.CreateSIEMDestinationRequest{ Name: "Splunk HEC", DestinationType: "splunk", EndpointURL: "https://splunk.example.com/services/collector", AuthToken: "secret", }) if err != nil { log.Fatal(err) } _, _ = client.Streaming.DeliveryLog(ctx, destination.ID) _ = client.Streaming.Test(ctx, destination.ID)

12. Error Handling

All SDKs use structured error types with machine-readable codes.

Error Codes

CodeDescription
ACTION_DENIEDGuardian denied the action based on policy
ELEVATION_REQUIREDAction requires elevated approval before proceeding
GUARDIAN_UNAVAILABLEGuardian service is unreachable (and fail_open is false)
AUTH_FAILEDInvalid or missing API key (HTTP 401)
AUTH_DENIEDInsufficient permissions (HTTP 403)
NOT_FOUNDResource not found (HTTP 404)
VALIDATION_ERRORRequest validation failed (HTTP 400)
NETWORK_ERRORNetwork-level failure (connection refused, timeout, DNS)

Language-Specific Error Types

Python

from agenttrustid.exceptions import AgentTrustError, AuthenticationError, NetworkError try: guard.check("dangerous_tool", "data", action_effect="destructive") except AgentTrustError as e: print(f"Code: {e.code}") # "ACTION_DENIED" | "ELEVATION_REQUIRED" | ... print(f"Message: {e.message}") print(f"Details: {e.details}") # {"approval_id": "..."} when ELEVATION_REQUIRED

Exception hierarchy:

  • AgentTrustError — base class with code, message, details
    • AuthenticationError — HTTP 401
    • AuthorizationError — HTTP 403
    • NetworkError — connection failures
    • ValidationError — HTTP 400
    • TokenExpiredError — expired tokens
    • AgentRevokedError — revoked agents

TypeScript

import { AgentTrustError, AuthenticationError, NetworkError } from '@agenttrustid/sdk'; try { await guard.check('dangerous_tool', 'data', 'destructive'); } catch (error) { if (error instanceof AgentTrustError) { console.log(`Code: ${error.code}`); // "ACTION_DENIED" | "ELEVATION_REQUIRED" | ... console.log(`Message: ${error.message}`); console.log(`Details:`, error.details); // { approvalId: "..." } when ELEVATION_REQUIRED } }

Error classes:

  • AgentTrustError — base class with code, message, details
    • AuthenticationError — HTTP 401
    • AuthorizationError — HTTP 403
    • NetworkError — connection failures
    • ValidationError — HTTP 400

Go

import "errors" err := guard.Check(ctx, "dangerous_tool", "data") if err != nil { var elevErr *ati.ElevationRequiredError var atiErr *ati.AgentTrustError var netErr *ati.NetworkError switch { case errors.As(err, &elevErr): fmt.Printf("Elevation required, approval ID: %s\n", elevErr.ApprovalID) case errors.As(err, &netErr): fmt.Printf("Network error: %s\n", netErr.Message) case errors.As(err, &atiErr): fmt.Printf("AgentTrust ID error [%s]: %s\n", atiErr.Code, atiErr.Message) default: fmt.Printf("Unknown error: %v\n", err) } }

Error types:

  • *AgentTrustError — base type with Code and Message fields
  • *ElevationRequiredError — embeds AgentTrustError, adds ApprovalID field
  • *NetworkError — embeds AgentTrustError, for connection failures

Java

import id.agenttrust.sdk.exceptions.AgentTrustException; import id.agenttrust.sdk.exceptions.ElevationRequiredException; import id.agenttrust.sdk.exceptions.NetworkException; try { guard.check("dangerous_tool", "data", "destructive"); } catch (ElevationRequiredException e) { System.out.println("Approval ID: " + e.getApprovalId()); } catch (NetworkException e) { System.out.println("Network error: " + e.getMessage()); } catch (AgentTrustException e) { System.out.println("Code: " + e.getCode()); // "ACTION_DENIED", etc. System.out.println("Status: " + e.getStatus()); // HTTP status code }

Exception hierarchy:

  • AgentTrustException — base class with getCode(), getMessage(), getStatus()
    • ElevationRequiredException — adds getApprovalId()
    • NetworkException — connection failures

Rust

use agenttrust_sdk::AgentTrustError; match guard.check_with_effect("dangerous_tool", "data", "destructive") { Ok(()) => println!("Allowed"), Err(AgentTrustError::ActionDenied { message, check_id }) => { println!("Denied: {}, check_id: {:?}", message, check_id); } Err(AgentTrustError::ElevationRequired { message, approval_id }) => { println!("Elevation required: {}, approval_id: {}", message, approval_id); } Err(AgentTrustError::GuardianUnavailable { message }) => { println!("Guardian unreachable: {}", message); } Err(AgentTrustError::Authentication { message, .. }) => { println!("Auth failed: {}", message); } Err(AgentTrustError::Network(e)) => { println!("Network error: {}", e); } Err(e) => println!("Other error: {}", e), }

Error enum variants:

  • AgentTrustError::ActionDenied { message, check_id } — Guardian denied the action
  • AgentTrustError::ElevationRequired { message, approval_id } — elevation needed
  • AgentTrustError::GuardianUnavailable { message } — Guardian is unreachable
  • AgentTrustError::Authentication { message, status } — HTTP 401
  • AgentTrustError::Authorization { message, status } — HTTP 403
  • AgentTrustError::Validation { message, status } — HTTP 400
  • AgentTrustError::NotFound { message, status } — HTTP 404
  • AgentTrustError::Network(reqwest::Error) — connection failures
  • AgentTrustError::Api { message, code, status } — other HTTP errors
  • AgentTrustError::Json(serde_json::Error) — serialization failures

13. Data Privacy

The SDK is private by default. No tool inputs, outputs, or prompts are sent to AgentTrust ID unless explicitly enabled.

DataSent?Notes
Tool nameYese.g., web_search
Tool inputNo (by default)Only character count unless log_inputs=True
Tool outputNeverOnly a success/failure boolean
DurationYesMilliseconds
Error typeYesException class name only, not message
Full promptsNeverSDK has no access to LLM prompts

When log_inputs=True (Python) or equivalent is enabled, inputs are truncated to 200 characters before sending. The tool_input_summary parameter in action checks is also capped at 200 characters.

What the Guard Sends

Each check() call sends:

  • Agent ID
  • Tool name
  • Truncated input summary (max 200 chars, empty by default)
  • Session ID
  • Action effect hint (if provided)

Each report() call buffers:

  • Tool name
  • Success boolean
  • Duration in milliseconds
  • Error type (class name only, if failed)

Telemetry is batched (up to 10 events) and sent asynchronously. It is best-effort and never crashes your agent.


Quick Reference: Method Signatures

OperationPythonTypeScriptGoJavaRust
Session initsessions.init_session(agent_id, server_id)sessions.initSession(agentId, serverId)Sessions.InitSession(ctx, req)sessions().initSession(agentId, serverId)sessions().init_session(agent_id, server_id)
API session initsessions.init_api_session(token)sessions.initApiSession(token)Sessions.InitAPISession(ctx, req)Not yet supportedNot yet supported
Session getsessions.get_session(session_id)sessions.getSession(sessionId)Sessions.GetSession(ctx, sessionID)sessions().getSession(sessionId)sessions().get_session(session_id)
Approveapprovals.approve(approval_id, decided_by)approvals.approve(approvalId, decidedBy)Approvals.Approve(ctx, approvalID, decidedBy)approvals().approve(approvalId, decidedBy)approvals().approve(approval_id, decided_by)
Denyapprovals.deny(approval_id, decided_by)approvals.deny(approvalId, decidedBy)Approvals.Deny(ctx, approvalID, decidedBy)approvals().deny(approvalId, decidedBy)approvals().deny(approval_id, decided_by)
Approval getapprovals.get(approval_id)approvals.get(approvalId)Approvals.Get(ctx, approvalID)approvals().get(approvalId)approvals().get(approval_id)
Delegation createdelegations.create(...)delegations.create({...})Delegations.Create(ctx, req)Not yet supportedNot yet supported
Delegation session initdelegations.init_session(delegation_id)delegations.initSession(delegationId)Delegations.InitSession(ctx, delegationID)Not yet supportedNot yet supported
Federation provider registerfederation.register_provider(...)federation.registerProvider({...})Federation.RegisterProvider(ctx, req)Not yet supportedNot yet supported
Federation session initfederation.init_session(token, issuer_hint=None)federation.initSession({ token, issuerHint? })Federation.InitSession(ctx, req)Not yet supportedNot yet supported
SIEM destination createstreaming.create(...)streaming.create({...})Streaming.Create(ctx, req)Not yet supportedNot yet supported
Guard checkcheck(tool, summary, effect)check(tool, summary, effect)Check(ctx, tool, summary)check(tool, summary, effect)check(tool, summary) / check_with_effect(tool, summary, effect)
Guard reportreport(tool, success, duration_ms)report(tool, success, durationMs)Report(tool, success, durationMs)report(tool, success, durationMs)report(tool, success, duration_ms)
Guard flushclose()flush()Close(ctx) / Flush(ctx)close() / flush()flush() (also on Drop)

FAQ

Q: Does the SDK add latency to tool calls? Action-check adds a small per-call latency (typical, not guaranteed; varies by deployment). Telemetry is async and is off the synchronous call path.

Q: What happens if I don’t set an API key? All API calls will fail with 401 Unauthorized. The sk_live_ key authenticates your organization.

Q: Can I use this with CrewAI or AutoGen? Yes. Python provides framework-specific callbacks for CrewAI and AutoGen. Install with pip install agenttrustid[langchain] for LangChain support.

Q: Do I need OAuth for A2A or MCP features? For A2A task dispatch between agents within your organization, API key auth is sufficient. OAuth 2.1 is required when external MCP clients connect to AgentTrust ID’s proxy or when federating with agents outside your org.

Q: Is the Rust SDK async? The Rust SDK uses reqwest::blocking for synchronous operation. Async support is planned for a future release.

Q: How do I handle telemetry failures? Telemetry is always best-effort. If the telemetry endpoint is unreachable, events are dropped silently. Your agent never crashes due to telemetry.

Last updated on