AgentTrust ID SDK Guide
Comprehensive multi-language guide for integrating AgentTrust ID into your AI agents across Python, TypeScript, Go, Java, and Rust.
Table of Contents
- Installation
- Client Initialization
- Agent Registration
- Token Issuance & Introspection
- Action Checking
- Session Management (AgentTrust ID 1.0)
- Guard Pattern
- Elevation Workflow (AgentTrust ID 1.0)
- Approval Management
- MCP Proxy
- A2A Delegation
- Error Handling
- Data Privacy
Current SDK Coverage
AgentTrust ID does not currently ship full feature parity across all five SDKs.
| Capability Area | Python | TypeScript | Go | Java | Rust |
|---|---|---|---|---|---|
| Core agents, tokens, actions, telemetry, guard | Yes | Yes | Yes | Yes | Yes |
| MCP session init/get | Yes | Yes | Yes | Yes | Yes |
| Approval workflow | Yes | Yes | Yes | Yes | Yes |
| MCP registry / proxy | Yes | Yes | No | No | No |
| A2A task dispatch | Yes | Yes | No | No | No |
| Delegation CRUD | Yes | Yes | Yes | No | No |
| Delegated session bridge | Yes | Yes | Yes | No | No |
| API token -> AgentTrust ID session bridge | Yes | Yes | Yes | No | No |
| Federation provider / token / session APIs | Yes | Yes | Yes | No | No |
| SIEM streaming destination APIs | Yes | Yes | Yes | No | No |
Use Go, Python, or TypeScript when you need the currently wired beta surface. Java and Rust remain focused on the core agent, token, action, session, and approval flows.
1. Installation
Python
pip install agenttrustidWith LangChain support:
pip install agenttrustid[langchain]TypeScript
npm install @agenttrustid/sdkGo
go get github.com/agenttrustid/sdk/goJava (Maven)
<dependency>
<groupId>id.agenttrust</groupId>
<artifactId>agenttrustid</artifactId>
<version>0.3.0</version>
</dependency>Java (Gradle)
implementation 'id.agenttrust:agenttrustid:0.3.0'Rust
cargo add agenttrustid2. Client Initialization
All SDKs support two initialization methods: explicit configuration and environment variables.
Environment Variables
All SDKs read the same environment variables:
export AGENTTRUST_URL=http://localhost:8080 # Gateway URL (or AGENTTRUST_BASE_URL)
export AGENTTRUST_API_KEY=sk_live_... # Organization API keyGet an API Key
- Log into the AgentTrust ID dashboard at
http://localhost:3002 - Go to Settings > API Keys > Create Key
- Copy the key (
sk_live_...) — it is only shown once
Constructor Examples
Python
from agenttrustid import AgentTrustClient
# From environment variables
client = AgentTrustClient.from_env()
# Explicit configuration
client = AgentTrustClient(
base_url="http://localhost:8080",
api_key="sk_live_...",
)TypeScript
import { AgentTrustClient } from '@agenttrustid/sdk';
// From environment variables
const client = AgentTrustClient.fromEnv();
// Explicit configuration
const client = new AgentTrustClient({
baseUrl: 'http://localhost:8080',
apiKey: 'sk_live_...',
});Go
import ati "github.com/agenttrustid/sdk/go"
// From environment variables
client := ati.FromEnv()
// Explicit configuration
client := ati.NewClient(
ati.WithBaseURL("http://localhost:8080"),
ati.WithAPIKey("sk_live_..."),
)Java
import id.agenttrust.sdk.AgentTrustClient;
// From environment variables
AgentTrustClient client = AgentTrustClient.fromEnv();
// Builder pattern
AgentTrustClient client = AgentTrustClient.builder()
.baseUrl("http://localhost:8080")
.apiKey("sk_live_...")
.build();Rust
use agenttrust_sdk::AgentTrustClient;
// From environment variables
let client = AgentTrustClient::from_env()?;
// Builder pattern
let client = AgentTrustClient::builder()
.base_url("http://localhost:8080")
.api_key("sk_live_...")
.build()?;3. Agent Registration
When you create an agent, AgentTrust ID creates an org-scoped agent record with framework, capability, metadata, and public-key fields. The current create path returns the agent record; it does not return a private key or certificate object.
Python
agent = client.agents.create(
name="research-bot",
framework="langchain",
capabilities=["web:search", "files:read"],
)
print(f"Agent ID: {agent.id}")
print(f"Status: {agent.status}")TypeScript
const agent = await client.agents.create({
name: 'research-bot',
framework: 'langchain',
capabilities: ['web:search', 'files:read'],
});
console.log(`Agent ID: ${agent.id}`);
console.log(`Status: ${agent.status}`);Go
import "fmt"
agent, err := client.Agents.Create(ctx, ati.CreateAgentRequest{
Name: "research-bot",
Framework: "langchain",
Capabilities: []string{"web:search", "files:read"},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Agent ID: %s\n", agent.ID)Java
import id.agenttrust.sdk.models.Agent;
import id.agenttrust.sdk.models.CreateAgentRequest;
import java.util.List;
Agent agent = client.agents().create(
CreateAgentRequest.builder()
.name("research-bot")
.framework("langchain")
.capabilities(List.of("web:search", "files:read"))
.build()
);
System.out.println("Agent ID: " + agent.getId());Rust
let agent = client.agents().create(&agenttrust::CreateAgentRequest {
name: "research-bot".to_string(),
framework: "langchain".to_string(),
capabilities: vec!["web:search".to_string(), "files:read".to_string()],
..Default::default()
})?;
println!("Agent ID: {}", agent.id);Listing and Revoking Agents
Python
# List all agents
agents = client.agents.list()
for a in agents:
print(f"{a.name} ({a.status})")
# Revoke an agent (immediate, cannot be undone)
client.agents.revoke(agent.id, reason="compromised")TypeScript
// List all agents
const agents = await client.agents.list();
agents.forEach(a => console.log(`${a.name} (${a.status})`));
// Revoke an agent
await client.agents.revoke(agent.id, 'compromised');4. Token Issuance & Introspection
Issue short-lived opaque capability tokens for agents to access specific resources. Tokens are cached by default to reduce latency and must be introspected server-side; they are not JWTs and cannot be validated client-side.
Python
# Issue a token
token = client.tokens.issue(
agent_id=agent.id,
scopes=["files:read"],
ttl_seconds=300,
)
print(f"Token: {token.token}")
# Introspect a token
result = client.tokens.introspect(
token=token.token,
target="mcp://filesystem",
)
print(f"Active: {result.active}")TypeScript
// Issue a token
const token = await client.tokens.issue({
agentId: agent.id,
scopes: ['files:read'],
ttlSeconds: 300,
});
console.log(`Token: ${token.token}`);
// Introspect a token
const result = await client.tokens.introspect({
token: token.token,
target: 'mcp://filesystem',
});
console.log(`Active: ${result.active}`);Go
// Issue a token
token, err := client.Tokens.Issue(ctx, ati.IssueTokenRequest{
AgentID: agent.ID,
Scope: []string{"files:read"},
TTL: 300,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Token: %s\n", token.Token)
// Introspect a token
result, err := client.Tokens.Introspect(ctx, ati.IntrospectTokenRequest{
Token: token.Token,
Target: "mcp://filesystem",
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Active: %v\n", result.Active)Java
// Issue a token
Token token = client.tokens().issue(
IssueTokenRequest.builder()
.agentId(agent.getId())
.scope(List.of("files:read"))
.ttl(300)
.build()
);
System.out.println("Token: " + token.getToken());
// Introspect a token
IntrospectionResult result = client.tokens().introspect(
IntrospectTokenRequest.builder()
.token(token.getToken())
.target("mcp://filesystem")
.build()
);
System.out.println("Active: " + result.isActive());Rust
// Issue a token
let token = client.tokens().issue(&agenttrust::IssueTokenRequest {
agent_id: agent.id.clone(),
scope: vec!["files:read".to_string()],
audience: vec![],
ttl: 300,
})?;
println!("Token: {}", token.token);
// Introspect a token
let result = client.tokens().introspect(&agenttrust::IntrospectTokenRequest {
token: token.token.clone(),
target: Some("mcp://filesystem".to_string()),
required_scopes: vec![],
})?;
println!("Active: {}", result.active);5. Action Checking
Pre-flight authorization check before every tool call. Uses Fast Guard rules optimized for low latency (typical, not guaranteed; varies by deployment). The action_effect parameter hints to Guardian how to classify the action: read, mutating, destructive, or admin. If omitted, the backend auto-classifies based on the tool name.
Python
result = client.actions.check(
agent_id=agent.id,
action="tool_call",
tool_name="web_search",
tool_input_summary="AI safety papers",
session_id="sess-abc",
action_effect="read",
)
print(f"Allowed: {result.allowed}, Confidence: {result.confidence}")
# If elevation is required
if result.elevation_required:
print(f"Approval needed: {result.approval_id}")TypeScript
const result = await client.actions.check({
agentId: agent.id,
toolName: 'web_search',
toolInputSummary: 'AI safety papers',
sessionId: 'sess-abc',
actionEffect: 'read',
});
console.log(`Allowed: ${result.allowed}, Confidence: ${result.confidence}`);
// If elevation is required
if (result.elevationRequired) {
console.log(`Approval needed: ${result.approvalId}`);
}Go
result, err := client.Actions.Check(ctx, ati.ActionCheckRequest{
AgentID: agent.ID,
Action: "tool_call",
ToolName: "web_search",
ToolInputSummary: "AI safety papers",
SessionID: "sess-abc",
ActionEffect: "read",
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Allowed: %v, Confidence: %v\n", result.Allowed, result.Confidence)
if result.ElevationRequired {
fmt.Printf("Approval needed: %s\n", result.ApprovalID)
}Java
ActionCheckResult result = client.actions().check(
ActionsAPI.ActionCheckRequest.builder()
.agentId(agent.getId())
.action("tool_call")
.toolName("web_search")
.toolInputSummary("AI safety papers")
.sessionId("sess-abc")
.actionEffect("read")
.build()
);
System.out.println("Allowed: " + result.isAllowed());
if (result.isElevationRequired()) {
System.out.println("Approval needed: " + result.getApprovalId());
}Rust
let result = client.actions().check(&agenttrust::ActionCheckRequest {
agent_id: agent.id.clone(),
action: "tool_call".to_string(),
tool_name: "web_search".to_string(),
tool_input_summary: "AI safety papers".to_string(),
session_id: "sess-abc".to_string(),
action_effect: Some("read".to_string()),
})?;
println!("Allowed: {}, Confidence: {:?}", result.allowed, result.confidence);
if result.elevation_required == Some(true) {
println!("Approval needed: {:?}", result.approval_id);
}6. Session Management (AgentTrust ID 1.0)
Sessions track agent-to-server connections with call metrics, scope ceilings, and session modes. Initialize a session when an agent connects to an MCP server, then reference the session ID in all subsequent action checks.
Session Modes
| Mode | Description |
|---|---|
open | Agent can call any tool within its capabilities |
restricted | Agent is limited to the scope ceiling set at session init |
supervised | All calls are logged; destructive actions require elevation |
Init and Get Session
Python
# Initialize a session
session = client.sessions.init_session(
agent_id=agent.id,
server_id="mcp-code-tools",
)
print(f"Session ID: {session.session_id}")
print(f"Mode: {session.mode}")
print(f"Scope ceiling: {session.scope_ceiling}")
# Retrieve session state later
session = client.sessions.get_session(session.session_id)
print(f"Total calls: {session.total_calls}, Denied: {session.denied_calls}")TypeScript
// Initialize a session
const session = await client.sessions.initSession(agent.id, 'mcp-code-tools');
console.log(`Session ID: ${session.sessionId}`);
console.log(`Mode: ${session.mode}`);
console.log(`Scope ceiling: ${session.scopeCeiling}`);
// Retrieve session state later
const current = await client.sessions.getSession(session.sessionId);
console.log(`Total calls: ${current.totalCalls}, Denied: ${current.deniedCalls}`);Go
// Initialize a session
session, err := client.Sessions.InitSession(ctx, ati.InitSessionRequest{
AgentID: agent.ID,
ServerID: "mcp-code-tools",
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Session ID: %s, Mode: %s\n", session.SessionID, session.Mode)
// Retrieve session state later
current, err := client.Sessions.GetSession(ctx, session.SessionID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Total calls: %d, Denied: %d\n", current.TotalCalls, current.DeniedCalls)Java
// Initialize a session
Session session = client.sessions().initSession(agent.getId(), "mcp-code-tools");
System.out.println("Session ID: " + session.getSessionId());
System.out.println("Mode: " + session.getMode());
// Retrieve session state later
Session current = client.sessions().getSession(session.getSessionId());
System.out.println("Total calls: " + current.getTotalCalls());Rust
// Initialize a session
let session = client.sessions().init_session(&agent.id, "mcp-code-tools")?;
println!("Session ID: {}", session.session_id);
println!("Mode: {}", session.mode);
println!("Scope ceiling: {:?}", session.scope_ceiling);
// Retrieve session state later
let current = client.sessions().get_session(&session.session_id)?;
println!("Total calls: {}, Denied: {}", current.total_calls, current.denied_calls);7. Guard Pattern
The Guard wraps action-check and telemetry into a simple check-then-report pattern. It is the recommended way to integrate AgentTrust ID into agents that use raw OpenAI, Anthropic, or other SDKs directly (not framework callbacks).
Guard Lifecycle
- Create a guard with an agent ID and options
- Call
check(tool, summary, effect)before each tool call - Execute the tool
- Call
report(tool, success, duration_ms)after each tool call - Call
flush()/close()when the agent is done
Guard Options
| Option | Default | Description |
|---|---|---|
session_id | auto-generated UUID | Session ID for telemetry correlation |
block_on_deny | true | Raise/return an error when Guardian denies an action |
fail_open | false | If Guardian is unreachable: false = block, true = allow |
Full Guard Workflow
Python
from agenttrustid import AgentTrustClient
from agenttrustid.guard import ATIGuard
from agenttrustid.exceptions import AgentTrustError
client = AgentTrustClient.from_env()
# Use as context manager for automatic cleanup
with ATIGuard(client, agent_id=agent.id, fail_open=False) as guard:
# Before tool call -- raises AgentTrustError if denied
guard.check("web_search", input_summary="AI news", action_effect="read")
# Execute the tool
start = time.time()
result = do_web_search("AI news")
duration_ms = int((time.time() - start) * 1000)
# After tool call -- fire-and-forget telemetry
guard.report("web_search", success=True, duration_ms=duration_ms)
# guard.close() is called automatically by the context managerTypeScript
import { AgentTrustClient, AgentTrustGuard } from '@agenttrustid/sdk';
const client = AgentTrustClient.fromEnv();
const guard = new AgentTrustGuard(client, agent.id, {
failOpen: false,
blockOnDeny: true,
});
try {
// Before tool call -- throws AgentTrustError if denied
await guard.check('web_search', 'AI news', 'read');
// Execute the tool
const start = Date.now();
const result = await doWebSearch('AI news');
const durationMs = Date.now() - start;
// After tool call -- fire-and-forget telemetry
guard.report('web_search', true, durationMs);
} finally {
// Flush remaining telemetry
await guard.flush();
}Go
import ati "github.com/agenttrustid/sdk/go"
client := ati.FromEnv()
guard := ati.NewGuard(client, agent.ID,
ati.WithFailOpen(false),
ati.WithBlockOnDeny(true),
ati.WithActionEffect("read"), // default effect for all checks
)
defer guard.Close(ctx)
// Before tool call -- returns error if denied
if err := guard.Check(ctx, "web_search", "AI news"); err != nil {
log.Printf("Denied: %v", err)
return
}
// Execute the tool
start := time.Now()
result, err := doWebSearch("AI news")
durationMs := int(time.Since(start).Milliseconds())
// After tool call -- fire-and-forget telemetry
guard.Report("web_search", err == nil, durationMs)Java
import id.agenttrust.sdk.AgentTrustClient;
import id.agenttrust.sdk.AgentTrustGuard;
AgentTrustClient client = AgentTrustClient.fromEnv();
// Use try-with-resources for automatic cleanup
try (AgentTrustGuard guard = new AgentTrustGuard(client, agent.getId())) {
// Before tool call -- throws AgentTrustException if denied
guard.check("web_search", "AI news", "read");
// Execute the tool
long start = System.currentTimeMillis();
Object result = doWebSearch("AI news");
int durationMs = (int) (System.currentTimeMillis() - start);
// After tool call -- fire-and-forget telemetry
guard.report("web_search", true, durationMs);
}
// guard.close() is called automaticallyRust
use agenttrust_sdk::{AgentTrustClient, AgentTrustGuard};
let client = AgentTrustClient::from_env()?;
let guard = AgentTrustGuard::builder(client, &agent.id)
.fail_open(false)
.block_on_deny(true)
.action_effect("read") // default effect for all checks
.build();
// Before tool call -- returns Err if denied
guard.check("web_search", "AI news")?;
// Or with an explicit per-call effect (overrides default)
guard.check_with_effect("db_delete", "drop table users", "destructive")?;
// Execute the tool
let start = std::time::Instant::now();
let result = do_web_search("AI news");
let duration_ms = start.elapsed().as_millis() as u64;
// After tool call -- fire-and-forget telemetry
guard.report("web_search", true, duration_ms);
// Flush remaining telemetry (also happens automatically on Drop)
guard.flush()?;8. Elevation Workflow (AgentTrust ID 1.0)
When an agent tries to perform a high-impact action (destructive, admin), Guardian may require elevated approval. The flow is:
guard.check()returns an elevation error with anapproval_id- A human or automated system approves or denies via
client.approvals.approve() guard.check()is retried — this time it succeeds (within the elevation window)
End-to-End Elevation Example
Python
from agenttrustid.exceptions import AgentTrustError
try:
guard.check("db_delete", "drop table users", action_effect="destructive")
except AgentTrustError as e:
if e.code == "ELEVATION_REQUIRED":
approval_id = e.details["approval_id"]
print(f"Elevation required. Approval ID: {approval_id}")
# A human or automated system approves the action
client.approvals.approve(approval_id, decided_by="admin@company.com")
# Retry the check -- now it succeeds within the elevation window
guard.check("db_delete", "drop table users", action_effect="destructive")
else:
raise # Re-raise ACTION_DENIED or other errorsTypeScript
import { AgentTrustError } from '@agenttrustid/sdk';
try {
await guard.check('db_delete', 'drop table users', 'destructive');
} catch (error) {
if (error instanceof AgentTrustError && error.code === 'ELEVATION_REQUIRED') {
const approvalId = error.details?.approvalId;
console.log(`Elevation required. Approval ID: ${approvalId}`);
// A human or automated system approves the action
await client.approvals.approve(approvalId, 'admin@company.com');
// Retry the check
await guard.check('db_delete', 'drop table users', 'destructive');
} else {
throw error;
}
}Go
import "errors"
err := guard.Check(ctx, "db_delete", "drop table users")
if err != nil {
var elevErr *ati.ElevationRequiredError
if errors.As(err, &elevErr) {
fmt.Printf("Elevation required. Approval ID: %s\n", elevErr.ApprovalID)
// A human or automated system approves the action
if err := client.Approvals.Approve(ctx, elevErr.ApprovalID, "admin@company.com"); err != nil {
log.Fatal(err)
}
// Retry the check
if err := guard.Check(ctx, "db_delete", "drop table users"); err != nil {
log.Fatal(err)
}
} else {
log.Fatal(err)
}
}Java
import id.agenttrust.sdk.exceptions.ElevationRequiredException;
import id.agenttrust.sdk.exceptions.AgentTrustException;
try {
guard.check("db_delete", "drop table users", "destructive");
} catch (ElevationRequiredException e) {
String approvalId = e.getApprovalId();
System.out.println("Elevation required. Approval ID: " + approvalId);
// A human or automated system approves the action
client.approvals().approve(approvalId, "admin@company.com");
// Retry the check
guard.check("db_delete", "drop table users", "destructive");
} catch (AgentTrustException e) {
// Handle ACTION_DENIED or other errors
throw e;
}Rust
use agenttrust_sdk::AgentTrustError;
match guard.check_with_effect("db_delete", "drop table users", "destructive") {
Ok(()) => {
// Proceed with the action
}
Err(AgentTrustError::ElevationRequired { approval_id, message }) => {
println!("Elevation required: {}. Approval ID: {}", message, approval_id);
// A human or automated system approves the action
client.approvals().approve(&approval_id, "admin@company.com")?;
// Retry the check
guard.check_with_effect("db_delete", "drop table users", "destructive")?;
}
Err(e) => return Err(e),
}9. Approval Management
Manage elevation approval requests programmatically. Approvals are created automatically by Guardian when an action requires elevation. They expire after a configurable window (default: 5 minutes).
Approve, Deny, and Get
Python
# Approve an elevation request
client.approvals.approve(approval_id, decided_by="admin@company.com")
# Deny an elevation request
client.approvals.deny(approval_id, decided_by="security@company.com")
# Check approval status
approval = client.approvals.get(approval_id)
print(f"Status: {approval.status}") # pending | approved | denied | expired
print(f"Action: {approval.action_name}, Effect: {approval.action_effect}")
print(f"Decided by: {approval.decided_by}")TypeScript
// Approve an elevation request
await client.approvals.approve(approvalId, 'admin@company.com');
// Deny an elevation request
await client.approvals.deny(approvalId, 'security@company.com');
// Check approval status
const approval = await client.approvals.get(approvalId);
console.log(`Status: ${approval.status}`);
console.log(`Action: ${approval.actionName}, Effect: ${approval.actionEffect}`);
console.log(`Decided by: ${approval.decidedBy}`);Go
// Approve an elevation request
err := client.Approvals.Approve(ctx, approvalID, "admin@company.com")
// Deny an elevation request
err := client.Approvals.Deny(ctx, approvalID, "security@company.com")
// Check approval status
approval, err := client.Approvals.Get(ctx, approvalID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Status: %s, Action: %s\n", approval.Status, approval.ActionName)Java
// Approve an elevation request
client.approvals().approve(approvalId, "admin@company.com");
// Deny an elevation request
client.approvals().deny(approvalId, "security@company.com");
// Check approval status
ApprovalRequest approval = client.approvals().get(approvalId);
System.out.println("Status: " + approval.getStatus());
System.out.println("Action: " + approval.getActionName());Rust
// Approve an elevation request
client.approvals().approve(&approval_id, "admin@company.com")?;
// Deny an elevation request
client.approvals().deny(&approval_id, "security@company.com")?;
// Check approval status
let approval = client.approvals().get(&approval_id)?;
println!("Status: {}, Action: {}", approval.status, approval.action_name);10. MCP Proxy
Register external MCP tool servers and call tools through AgentTrust ID’s security proxy. Every tools/call is Guardian-checked before forwarding to the upstream server. The session_id parameter correlates calls with an AgentTrust ID session.
MCP proxy integration is available in the Python and TypeScript SDKs. Go, Java, and Rust SDKs use the action-check API directly for MCP-related authorization.
Python
# Register an MCP tool server
server = client.mcp.register_server(
name="code-tools",
url="https://mcp-server.example.com",
capabilities=["run_code", "lint", "format"],
)
print(f"Server ID: {server.id}")
# Call a tool through the proxy (Guardian checks apply)
result = client.mcp.call_tool(
server_id=server.id,
method="tools/call",
params={
"name": "run_code",
"arguments": {"language": "python", "code": "print('hello')"},
},
session_id=session.session_id, # correlate with AgentTrust ID session
)
print(result)
# List registered servers
servers = client.mcp.list_servers()
# Remove a server
client.mcp.remove_server(server.id)TypeScript
// Register an MCP server
const server = await client.mcp.registerServer({
name: 'code-tools',
url: 'https://mcp-server.example.com',
capabilities: ['run_code', 'lint', 'format'],
});
// Call a tool through the proxy with session correlation
const result = await client.mcp.callTool(
server.id,
'tools/call',
{ name: 'run_code', arguments: { language: 'python', code: "print('hello')" } },
session.sessionId // correlate with AgentTrust ID session
);
// List servers
const servers = await client.mcp.listServers();
// Remove a server
await client.mcp.removeServer(server.id);11. A2A Delegation
Agent Cards
Generate, publish, and retrieve A2A-compliant Agent Cards. Published cards are available at /a2a/agents/{id}/agent.json.
Python
# Generate an Agent Card
card = client.agent_cards.generate(agent_id=agent.id)
print(f"{card.name}: {card.capabilities}")
# Publish the card (makes it publicly discoverable)
client.agent_cards.publish(agent_id=agent.id)
# Retrieve a public Agent Card (no auth required)
public_card = client.agent_cards.get_public(agent_id=agent.id)TypeScript
// Generate an Agent Card
const card = await client.agentCards.generate(agent.id);
// Publish the card
await client.agentCards.publish(agent.id);
// Retrieve a public card
const publicCard = await client.agentCards.getPublic(agent.id);A2A Task Dispatch
Send work to another agent using the A2A v1.0 message/send method. The v0.3
send_task / sendTask methods remain available for back-compatibility.
Python
# A2A v1.0: send a message to a target agent
task = client.a2a.send_message(
agent_b.id,
"Search for recent AI safety papers and summarize the top 3",
)
print(f"Task {task.id}")
# Poll for completion, then cancel if needed
status = client.a2a.get_task(task.id)
client.a2a.cancel_task(task.id)
# Back-compat (A2A v0.3):
# client.a2a.send_task(source_agent_id=..., target_agent_id=..., message=...)TypeScript
// A2A v1.0: send a message to a target agent
const task = await client.a2a.sendMessage(agentB.id, {
text: 'Search for recent AI safety papers',
});
const status = await client.a2a.getTask(task.id);
await client.a2a.cancelTask(task.id);
// Back-compat (A2A v0.3):
// client.a2a.sendTask({ sourceAgentId, targetAgentId, message })Delegation Chains
Delegate capabilities from one agent to another with scope narrowing. Delegations are time-bounded, revocable, and audited.
Python
# Delegate web_search capability for 1 hour
delegation = client.delegations.create(
from_agent_id=agent_a.id,
to_agent_id=agent_b.id,
scope=["web_search"],
ttl_seconds=3600,
)
print(f"Delegation {delegation.id} expires at {delegation.expires_at}")
# List delegations
delegations = client.delegations.list()
# Revoke a delegation
client.delegations.revoke(delegation.id)TypeScript
// Create a delegation
const delegation = await client.delegations.create({
fromAgentId: agentA.id,
toAgentId: agentB.id,
scope: ['web_search'],
ttlSeconds: 3600,
});
// List delegations
const delegations = await client.delegations.list();
// Revoke a delegation
await client.delegations.revoke(delegation.id);Delegated Session Bridge
Use a delegation to bootstrap a local AgentTrust ID session before making mediated calls.
Python
session = client.delegations.init_session(delegation.id)
print(session.session_id, session.delegation_id, session.scope_ceiling)TypeScript
const session = await client.delegations.initSession(delegation.id);
console.log(session.sessionId, session.delegationId, session.scopeCeiling);Go
session, err := client.Delegations.InitSession(ctx, delegation.ID)
if err != nil {
log.Fatal(err)
}
fmt.Println(session.SessionID, session.DelegationID, session.ScopeCeiling)API Session Bridge
Bridge an already-issued AgentTrust ID API token into a local AgentTrust ID session.
Python
session = client.sessions.init_api_session(token.token)
print(session.session_id, session.source)TypeScript
const session = await client.sessions.initApiSession(token.token);
console.log(session.sessionId, session.source);Go
session, err := client.Sessions.InitAPISession(ctx, ati.InitAPISessionRequest{
Token: token.Token,
})
if err != nil {
log.Fatal(err)
}Federation
Register OIDC providers, verify external ID tokens, and bridge them into local AgentTrust ID sessions.
Python
provider = client.federation.register_provider(
issuer="https://issuer.example.com",
name="Example Workforce",
trust_level="high",
)
result = client.federation.verify_token(token="eyJ...")
session = client.federation.init_session(token="eyJ...")TypeScript
const provider = await client.federation.registerProvider({
issuer: 'https://issuer.example.com',
name: 'Example Workforce',
trustLevel: 'high',
});
const result = await client.federation.verifyToken({ token: 'eyJ...' });
const session = await client.federation.initSession({ token: 'eyJ...' });Go
provider, err := client.Federation.RegisterProvider(ctx, ati.RegisterProviderRequest{
Issuer: "https://issuer.example.com",
Name: "Example Workforce",
TrustLevel: "high",
})
if err != nil {
log.Fatal(err)
}
_, _ = client.Federation.VerifyToken(ctx, ati.VerifyFederatedTokenRequest{Token: "eyJ..."})
_, _ = client.Federation.InitSession(ctx, ati.InitFederatedSessionRequest{Token: "eyJ..."})
_ = providerSIEM Streaming
Configure audit-event delivery to external security platforms.
Python
destination = client.streaming.create(
name="Splunk HEC",
destination_type="splunk",
endpoint_url="https://splunk.example.com/services/collector",
auth_token="secret",
)
logs = client.streaming.delivery_log(destination.id)
client.streaming.test(destination.id)TypeScript
const destination = await client.streaming.create({
name: 'Splunk HEC',
destinationType: 'splunk',
endpointUrl: 'https://splunk.example.com/services/collector',
authToken: 'secret',
});
const logs = await client.streaming.deliveryLog(destination.id);
await client.streaming.test(destination.id);Go
destination, err := client.Streaming.Create(ctx, ati.CreateSIEMDestinationRequest{
Name: "Splunk HEC",
DestinationType: "splunk",
EndpointURL: "https://splunk.example.com/services/collector",
AuthToken: "secret",
})
if err != nil {
log.Fatal(err)
}
_, _ = client.Streaming.DeliveryLog(ctx, destination.ID)
_ = client.Streaming.Test(ctx, destination.ID)12. Error Handling
All SDKs use structured error types with machine-readable codes.
Error Codes
| Code | Description |
|---|---|
ACTION_DENIED | Guardian denied the action based on policy |
ELEVATION_REQUIRED | Action requires elevated approval before proceeding |
GUARDIAN_UNAVAILABLE | Guardian service is unreachable (and fail_open is false) |
AUTH_FAILED | Invalid or missing API key (HTTP 401) |
AUTH_DENIED | Insufficient permissions (HTTP 403) |
NOT_FOUND | Resource not found (HTTP 404) |
VALIDATION_ERROR | Request validation failed (HTTP 400) |
NETWORK_ERROR | Network-level failure (connection refused, timeout, DNS) |
Language-Specific Error Types
Python
from agenttrustid.exceptions import AgentTrustError, AuthenticationError, NetworkError
try:
guard.check("dangerous_tool", "data", action_effect="destructive")
except AgentTrustError as e:
print(f"Code: {e.code}") # "ACTION_DENIED" | "ELEVATION_REQUIRED" | ...
print(f"Message: {e.message}")
print(f"Details: {e.details}") # {"approval_id": "..."} when ELEVATION_REQUIREDException hierarchy:
AgentTrustError— base class withcode,message,detailsAuthenticationError— HTTP 401AuthorizationError— HTTP 403NetworkError— connection failuresValidationError— HTTP 400TokenExpiredError— expired tokensAgentRevokedError— revoked agents
TypeScript
import { AgentTrustError, AuthenticationError, NetworkError } from '@agenttrustid/sdk';
try {
await guard.check('dangerous_tool', 'data', 'destructive');
} catch (error) {
if (error instanceof AgentTrustError) {
console.log(`Code: ${error.code}`); // "ACTION_DENIED" | "ELEVATION_REQUIRED" | ...
console.log(`Message: ${error.message}`);
console.log(`Details:`, error.details); // { approvalId: "..." } when ELEVATION_REQUIRED
}
}Error classes:
AgentTrustError— base class withcode,message,detailsAuthenticationError— HTTP 401AuthorizationError— HTTP 403NetworkError— connection failuresValidationError— HTTP 400
Go
import "errors"
err := guard.Check(ctx, "dangerous_tool", "data")
if err != nil {
var elevErr *ati.ElevationRequiredError
var atiErr *ati.AgentTrustError
var netErr *ati.NetworkError
switch {
case errors.As(err, &elevErr):
fmt.Printf("Elevation required, approval ID: %s\n", elevErr.ApprovalID)
case errors.As(err, &netErr):
fmt.Printf("Network error: %s\n", netErr.Message)
case errors.As(err, &atiErr):
fmt.Printf("AgentTrust ID error [%s]: %s\n", atiErr.Code, atiErr.Message)
default:
fmt.Printf("Unknown error: %v\n", err)
}
}Error types:
*AgentTrustError— base type withCodeandMessagefields*ElevationRequiredError— embedsAgentTrustError, addsApprovalIDfield*NetworkError— embedsAgentTrustError, for connection failures
Java
import id.agenttrust.sdk.exceptions.AgentTrustException;
import id.agenttrust.sdk.exceptions.ElevationRequiredException;
import id.agenttrust.sdk.exceptions.NetworkException;
try {
guard.check("dangerous_tool", "data", "destructive");
} catch (ElevationRequiredException e) {
System.out.println("Approval ID: " + e.getApprovalId());
} catch (NetworkException e) {
System.out.println("Network error: " + e.getMessage());
} catch (AgentTrustException e) {
System.out.println("Code: " + e.getCode()); // "ACTION_DENIED", etc.
System.out.println("Status: " + e.getStatus()); // HTTP status code
}Exception hierarchy:
AgentTrustException— base class withgetCode(),getMessage(),getStatus()ElevationRequiredException— addsgetApprovalId()NetworkException— connection failures
Rust
use agenttrust_sdk::AgentTrustError;
match guard.check_with_effect("dangerous_tool", "data", "destructive") {
Ok(()) => println!("Allowed"),
Err(AgentTrustError::ActionDenied { message, check_id }) => {
println!("Denied: {}, check_id: {:?}", message, check_id);
}
Err(AgentTrustError::ElevationRequired { message, approval_id }) => {
println!("Elevation required: {}, approval_id: {}", message, approval_id);
}
Err(AgentTrustError::GuardianUnavailable { message }) => {
println!("Guardian unreachable: {}", message);
}
Err(AgentTrustError::Authentication { message, .. }) => {
println!("Auth failed: {}", message);
}
Err(AgentTrustError::Network(e)) => {
println!("Network error: {}", e);
}
Err(e) => println!("Other error: {}", e),
}Error enum variants:
AgentTrustError::ActionDenied { message, check_id }— Guardian denied the actionAgentTrustError::ElevationRequired { message, approval_id }— elevation neededAgentTrustError::GuardianUnavailable { message }— Guardian is unreachableAgentTrustError::Authentication { message, status }— HTTP 401AgentTrustError::Authorization { message, status }— HTTP 403AgentTrustError::Validation { message, status }— HTTP 400AgentTrustError::NotFound { message, status }— HTTP 404AgentTrustError::Network(reqwest::Error)— connection failuresAgentTrustError::Api { message, code, status }— other HTTP errorsAgentTrustError::Json(serde_json::Error)— serialization failures
13. Data Privacy
The SDK is private by default. No tool inputs, outputs, or prompts are sent to AgentTrust ID unless explicitly enabled.
| Data | Sent? | Notes |
|---|---|---|
| Tool name | Yes | e.g., web_search |
| Tool input | No (by default) | Only character count unless log_inputs=True |
| Tool output | Never | Only a success/failure boolean |
| Duration | Yes | Milliseconds |
| Error type | Yes | Exception class name only, not message |
| Full prompts | Never | SDK has no access to LLM prompts |
When log_inputs=True (Python) or equivalent is enabled, inputs are truncated to 200 characters before sending. The tool_input_summary parameter in action checks is also capped at 200 characters.
What the Guard Sends
Each check() call sends:
- Agent ID
- Tool name
- Truncated input summary (max 200 chars, empty by default)
- Session ID
- Action effect hint (if provided)
Each report() call buffers:
- Tool name
- Success boolean
- Duration in milliseconds
- Error type (class name only, if failed)
Telemetry is batched (up to 10 events) and sent asynchronously. It is best-effort and never crashes your agent.
Quick Reference: Method Signatures
| Operation | Python | TypeScript | Go | Java | Rust |
|---|---|---|---|---|---|
| Session init | sessions.init_session(agent_id, server_id) | sessions.initSession(agentId, serverId) | Sessions.InitSession(ctx, req) | sessions().initSession(agentId, serverId) | sessions().init_session(agent_id, server_id) |
| API session init | sessions.init_api_session(token) | sessions.initApiSession(token) | Sessions.InitAPISession(ctx, req) | Not yet supported | Not yet supported |
| Session get | sessions.get_session(session_id) | sessions.getSession(sessionId) | Sessions.GetSession(ctx, sessionID) | sessions().getSession(sessionId) | sessions().get_session(session_id) |
| Approve | approvals.approve(approval_id, decided_by) | approvals.approve(approvalId, decidedBy) | Approvals.Approve(ctx, approvalID, decidedBy) | approvals().approve(approvalId, decidedBy) | approvals().approve(approval_id, decided_by) |
| Deny | approvals.deny(approval_id, decided_by) | approvals.deny(approvalId, decidedBy) | Approvals.Deny(ctx, approvalID, decidedBy) | approvals().deny(approvalId, decidedBy) | approvals().deny(approval_id, decided_by) |
| Approval get | approvals.get(approval_id) | approvals.get(approvalId) | Approvals.Get(ctx, approvalID) | approvals().get(approvalId) | approvals().get(approval_id) |
| Delegation create | delegations.create(...) | delegations.create({...}) | Delegations.Create(ctx, req) | Not yet supported | Not yet supported |
| Delegation session init | delegations.init_session(delegation_id) | delegations.initSession(delegationId) | Delegations.InitSession(ctx, delegationID) | Not yet supported | Not yet supported |
| Federation provider register | federation.register_provider(...) | federation.registerProvider({...}) | Federation.RegisterProvider(ctx, req) | Not yet supported | Not yet supported |
| Federation session init | federation.init_session(token, issuer_hint=None) | federation.initSession({ token, issuerHint? }) | Federation.InitSession(ctx, req) | Not yet supported | Not yet supported |
| SIEM destination create | streaming.create(...) | streaming.create({...}) | Streaming.Create(ctx, req) | Not yet supported | Not yet supported |
| Guard check | check(tool, summary, effect) | check(tool, summary, effect) | Check(ctx, tool, summary) | check(tool, summary, effect) | check(tool, summary) / check_with_effect(tool, summary, effect) |
| Guard report | report(tool, success, duration_ms) | report(tool, success, durationMs) | Report(tool, success, durationMs) | report(tool, success, durationMs) | report(tool, success, duration_ms) |
| Guard flush | close() | flush() | Close(ctx) / Flush(ctx) | close() / flush() | flush() (also on Drop) |
FAQ
Q: Does the SDK add latency to tool calls? Action-check adds a small per-call latency (typical, not guaranteed; varies by deployment). Telemetry is async and is off the synchronous call path.
Q: What happens if I don’t set an API key?
All API calls will fail with 401 Unauthorized. The sk_live_ key authenticates your organization.
Q: Can I use this with CrewAI or AutoGen?
Yes. Python provides framework-specific callbacks for CrewAI and AutoGen. Install with pip install agenttrustid[langchain] for LangChain support.
Q: Do I need OAuth for A2A or MCP features? For A2A task dispatch between agents within your organization, API key auth is sufficient. OAuth 2.1 is required when external MCP clients connect to AgentTrust ID’s proxy or when federating with agents outside your org.
Q: Is the Rust SDK async?
The Rust SDK uses reqwest::blocking for synchronous operation. Async support is planned for a future release.
Q: How do I handle telemetry failures? Telemetry is always best-effort. If the telemetry endpoint is unreachable, events are dropped silently. Your agent never crashes due to telemetry.