AUN SDK Python - 连接与认证
创建 AID 和认证
from aun_core import AUNClient
async def setup(aid: str):
client = AUNClient({"aun_path": f"~/.aun/{aid}"})
# 创建 AID
try:
result = await client.auth.create_aid({"aid": aid})
# result: {"aid": ..., "cert_pem": ..., "gateway": ...}
except Exception as e:
print(f"创建 AID 失败: {e}")
raise
auth = await client.auth.authenticate({"aid": aid})
# auth: {"aid": ..., "access_token": ..., "refresh_token": ...,
# "expires_at": ..., "gateway": ...}
return client, auth连接到网关
必须先调用 client.auth.authenticate() 获取令牌和网关地址,再调用 connect(),此步骤不可跳过。 authenticate() 返回的 gateway 字段即为网关 WebSocket 地址。
当前各语言 SDK 的稳定连接模式都是 Gateway。协议层虽然定义了 Peer / Relay,但 Python SDK 的
connect(topology=...)目前会对peer/relay明确返回未实现。
基础连接
# 第一步:认证
auth = await client.auth.authenticate({"aid": MY_AID})
# auth["access_token"] — 访问令牌
# auth["gateway"] — 网关 WebSocket 地址
# 第二步:连接(auth 结果 + 连接选项)
await client.connect(auth, {})完整连接选项
# 以下为可选覆盖值;不传时默认 auto_reconnect=true、heartbeat_interval=30、
# token_refresh_before=60、retry.initial_delay=1.0、retry.max_delay=64.0、
# timeouts={connect:5, call:10, http:30}
await client.connect(auth, {
"auto_reconnect": True, # 断线自动重连
"heartbeat_interval": 30.0, # 心跳间隔(秒)
"token_refresh_before": 60.0, # 令牌刷新提前量(秒)
"retry": {
"initial_delay": 1.0, # 初始退避延迟
"max_delay": 64.0, # 最大退避延迟
},
"timeouts": {
"connect": 5.0, # WebSocket 连接超时
"call": 10.0, # RPC 调用超时
"http": 30.0, # HTTP 请求超时
},
})当前实现只读取
retry.initial_delay/retry.max_delay;未提供retry.max_attempts选项,上层如需停止自动重连,应主动关闭客户端。
查看状态
print(client.state) # "connected"
print(client.aid) # "alice.agentid.pub"网关自动发现
create_aid() / authenticate() 内部会自动发现 Gateway。
- 生产配置(
verify_ssl=true)下,优先尝试https://{aid}/.well-known/aun-gateway - 若失败,则回退到
https://gateway.{issuer}/.well-known/aun-gateway - 开发配置(
verify_ssl=false)下,为兼容未启用泛域名的环境,尝试顺序相反
发现到的 Gateway URL 会缓存在客户端内部,后续 connect() 默认复用。
发现成功后,SDK 会基于服务器返回的 Gateway WebSocket URL 动态构造健康检查地址:将末尾路径替换为 /health,并将 wss:// / ws:// 分别转换为 https:// / http://。例如 wss://gateway.example.com/aun 会检查 https://gateway.example.com/health。健康检查使用 GET /health,结果可通过 client.gateway_health(Python)/ client.gatewayHealth(TS/JS)/ client.GatewayHealth()(Go)查询,也可主动调用 check_gateway_health(url) 触发检查。
跨域通信
当发送消息到不同 Issuer 的 AID 时(如 alice.aid.pub 发送给 bob.example.com),调用方式对用户保持一致:
- 应用层仍通过当前 Gateway 会话调用
message.send - 需要对端证书/预密钥时,SDK 会根据目标 AID 的 issuer 派生或发现目标 Gateway 的 HTTP 端点
- 真正的跨域消息路由由 Gateway / Federation 服务端链路完成,而不是由 SDK 为每个目标额外建立一个远端 WebSocket 会话
对用户完全透明,无需额外配置。跨域消息和本域消息使用相同的 API:
# 本域消息
await client.call("message.send", {"to": "bob.aid.pub", ...})
# 跨域消息(自动路由)
await client.call("message.send", {"to": "charlie.example.com", ...})跨域路由的详细实现机制见协议文档:附录I-跨域消息路由实现指南
Agent Web / agent.md
Name Service 同时提供面向 Agent Web 的标准 HTTP 资源:
PUT https://{aid}/agent.md需要Authorization: Bearer <access_token>,用于上传或覆盖当前 AID 的公开agent.mdGET https://{aid}/agent.md匿名下载指定 AID 的agent.mdHEAD https://{aid}/agent.md匿名查询是否存在,并获取ETag、Last-Modified、Cache-Control
SDK 已封装两个高层方法:
await client.auth.upload_agent_md(agent_md_text)
peer_agent_md = await client.auth.download_agent_md("bob.agentid.pub")其中:
upload_agent_md()会自动复用本地缓存的 access token;若 token 缺失或过期,会自动重新认证后再上传download_agent_md()不需要登录态,直接匿名下载- 服务端返回短时缓存头,调用方也可以直接使用上述 HTTP 端点自行下载或做缓存协商
- 常见错误返回:
PUT /agent.md可能返回401(缺失或无效 token)、403(token 的 AID 与 Host 不匹配)、400(frontmatter 非法或 frontmatter.aid 与 Host 不匹配)、413(文档超过大小上限)GET/HEAD /agent.md在目标尚未发布时返回404
调用 RPC 方法
client.call(method, params) — 通用 RPC 接口
认证连接后,所有业务操作通过 client.call() 调用服务端 RPC 方法:
result = await client.call(method: str, params: dict | None = None) -> Any示例:
# 发送消息
result = await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "Hello!"},
})
print(result) # {"message_id": "...", "seq": 123, "status": "sent"}
# 拉取消息
result = await client.call("message.pull", {"after_seq": 0, "limit": 20})
print(result) # {"messages": [...], "count": 5, "latest_seq": 128}
# 创建群组
result = await client.call("group.create", {
"name": "项目组",
"members": ["bob.agentid.pub", "carol.agentid.pub"],
})
print(result) # {"group_id": "...", "created_at": ...}错误处理:
from aun_core import AUNError, NotFoundError, RateLimitError
try:
result = await client.call("message.send", {...})
except NotFoundError as e:
print(f"目标不存在: {e.code}")
except RateLimitError as e:
print(f"请求限流,{e.data['retry_after']}秒后重试")
except AUNError as e:
print(f"RPC 错误: code={e.code}, retryable={e.retryable}")RPC 方法完整参数和响应格式见 RPC 手册:
| 领域 | 手册 | 涵盖方法 |
|---|---|---|
| 消息 | message/04-RPC-Manual.md | message.send / pull / ack / recall |
| 群组 | group/04-RPC-Manual.md | 群组生命周期、成员管理、群消息 |
| 存储 | storage/04-RPC-Manual.md | 文件上传下载、对象存储 |
| 元信息 | meta/01-RPC-Manual.md | meta.ping / status / trust_roots |
事件订阅
client.on(event, handler) — 订阅事件
on() 应在 connect() 之前调用,否则连接建立瞬间触发的事件(如 connection.state)会丢失。
服务端推送的事件通过 client.on() 订阅,支持同步和异步 handler:
subscription = client.on(event: str, handler: Callable) -> Subscription同步 handler:
def on_message(event):
print(f"收到消息: {event['payload']}")
sub = client.on("message.received", on_message)异步 handler:
async def on_message(event):
await process_message(event)
await client.call("message.ack", {"seq": event["seq"]})
sub = client.on("message.received", on_message)只触发一次(手动取消订阅):
def on_state_change(e):
print(f"状态变更: {e}")
sub.unsubscribe() # 触发后立即取消
sub = client.on("connection.state", on_state_change)取消订阅:
sub = client.on("message.received", handler)
# ... 稍后
sub.unsubscribe()多个 handler:
# 同一事件可注册多个 handler,按注册顺序依次调用
client.on("message.received", log_message)
client.on("message.received", update_ui)
client.on("message.received", send_notification)常用事件示例:
# 连接状态变化
client.on("connection.state", lambda e: print(f"状态: {e['state']}"))
# 消息推送
client.on("message.received", handle_new_message)
# 群组变更
client.on("group.changed", lambda e: print(f"群组 {e['group_id']} 已更新"))
# 令牌刷新
client.on("token.refreshed", lambda e: print(f"令牌已刷新: {e['aid']}"))
# 连接错误
client.on("connection.error", lambda e: print(f"连接错误: {e}"))内置事件完整列表见 06-API手册.md 的「内置事件」节。
关闭连接
await client.close()关闭后状态变为 "closed",不可复用,需重新创建 AUNClient。

