Skip to content

AUN SDK Python - 连接与认证


创建 AID 和认证

python
from aun_core import AUNClient

async def setup(aid: str):
    client = AUNClient({"aun_path": f"~/.aun/{aid}"})

    # 创建 AID
    try:
        result = await client.auth.create_aid({"aid": aid})
        # result: {"aid": ..., "cert_pem": ..., "gateway": ...}
    except Exception as e:
        print(f"创建 AID 失败: {e}")
        raise

    auth = await client.auth.authenticate({"aid": aid})
    # auth: {"aid": ..., "access_token": ..., "refresh_token": ...,
    #        "expires_at": ..., "gateway": ...}
    return client, auth

连接到网关

必须先调用 client.auth.authenticate() 获取令牌和网关地址,再调用 connect(),此步骤不可跳过。 authenticate() 返回的 gateway 字段即为网关 WebSocket 地址。

当前各语言 SDK 的稳定连接模式都是 Gateway。协议层虽然定义了 Peer / Relay,但 Python SDK 的 connect(topology=...) 目前会对 peer / relay 明确返回未实现。

基础连接

python
# 第一步:认证
auth = await client.auth.authenticate({"aid": MY_AID})
# auth["access_token"] — 访问令牌
# auth["gateway"]      — 网关 WebSocket 地址

# 第二步:连接(auth 结果 + 连接选项)
await client.connect(auth, {})

完整连接选项

python
# 以下为可选覆盖值;不传时默认 auto_reconnect=true、heartbeat_interval=30、
# token_refresh_before=60、retry.initial_delay=1.0、retry.max_delay=64.0、
# timeouts={connect:5, call:10, http:30}
await client.connect(auth, {
    "auto_reconnect": True,                 # 断线自动重连
    "heartbeat_interval": 30.0,             # 心跳间隔(秒)
    "token_refresh_before": 60.0,           # 令牌刷新提前量(秒)
    "retry": {
        "initial_delay": 1.0,               # 初始退避延迟
        "max_delay": 64.0,                  # 最大退避延迟
    },
    "timeouts": {
        "connect": 5.0,                     # WebSocket 连接超时
        "call": 10.0,                       # RPC 调用超时
        "http": 30.0,                       # HTTP 请求超时
    },
})

当前实现只读取 retry.initial_delay / retry.max_delay;未提供 retry.max_attempts 选项,上层如需停止自动重连,应主动关闭客户端。

查看状态

python
print(client.state)  # "connected"
print(client.aid)    # "alice.agentid.pub"

网关自动发现

create_aid() / authenticate() 内部会自动发现 Gateway。

  • 生产配置(verify_ssl=true)下,优先尝试 https://{aid}/.well-known/aun-gateway
  • 若失败,则回退到 https://gateway.{issuer}/.well-known/aun-gateway
  • 开发配置(verify_ssl=false)下,为兼容未启用泛域名的环境,尝试顺序相反

发现到的 Gateway URL 会缓存在客户端内部,后续 connect() 默认复用。

发现成功后,SDK 会基于服务器返回的 Gateway WebSocket URL 动态构造健康检查地址:将末尾路径替换为 /health,并将 wss:// / ws:// 分别转换为 https:// / http://。例如 wss://gateway.example.com/aun 会检查 https://gateway.example.com/health。健康检查使用 GET /health,结果可通过 client.gateway_health(Python)/ client.gatewayHealth(TS/JS)/ client.GatewayHealth()(Go)查询,也可主动调用 check_gateway_health(url) 触发检查。

跨域通信

当发送消息到不同 Issuer 的 AID 时(如 alice.aid.pub 发送给 bob.example.com),调用方式对用户保持一致:

  1. 应用层仍通过当前 Gateway 会话调用 message.send
  2. 需要对端证书/预密钥时,SDK 会根据目标 AID 的 issuer 派生或发现目标 Gateway 的 HTTP 端点
  3. 真正的跨域消息路由由 Gateway / Federation 服务端链路完成,而不是由 SDK 为每个目标额外建立一个远端 WebSocket 会话

对用户完全透明,无需额外配置。跨域消息和本域消息使用相同的 API:

python
# 本域消息
await client.call("message.send", {"to": "bob.aid.pub", ...})

# 跨域消息(自动路由)
await client.call("message.send", {"to": "charlie.example.com", ...})

跨域路由的详细实现机制见协议文档:附录I-跨域消息路由实现指南


Agent Web / agent.md

Name Service 同时提供面向 Agent Web 的标准 HTTP 资源:

  • PUT https://{aid}/agent.md 需要 Authorization: Bearer <access_token>,用于上传或覆盖当前 AID 的公开 agent.md
  • GET https://{aid}/agent.md 匿名下载指定 AID 的 agent.md
  • HEAD https://{aid}/agent.md 匿名查询是否存在,并获取 ETagLast-ModifiedCache-Control

SDK 已封装两个高层方法:

python
await client.auth.upload_agent_md(agent_md_text)
peer_agent_md = await client.auth.download_agent_md("bob.agentid.pub")

其中:

  • upload_agent_md() 会自动复用本地缓存的 access token;若 token 缺失或过期,会自动重新认证后再上传
  • download_agent_md() 不需要登录态,直接匿名下载
  • 服务端返回短时缓存头,调用方也可以直接使用上述 HTTP 端点自行下载或做缓存协商
  • 常见错误返回: PUT /agent.md 可能返回 401(缺失或无效 token)、403(token 的 AID 与 Host 不匹配)、400(frontmatter 非法或 frontmatter.aid 与 Host 不匹配)、413(文档超过大小上限) GET/HEAD /agent.md 在目标尚未发布时返回 404

调用 RPC 方法

client.call(method, params) — 通用 RPC 接口

认证连接后,所有业务操作通过 client.call() 调用服务端 RPC 方法:

python
result = await client.call(method: str, params: dict | None = None) -> Any

示例:

python
# 发送消息
result = await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "Hello!"},
})
print(result)  # {"message_id": "...", "seq": 123, "status": "sent"}

# 拉取消息
result = await client.call("message.pull", {"after_seq": 0, "limit": 20})
print(result)  # {"messages": [...], "count": 5, "latest_seq": 128}

# 创建群组
result = await client.call("group.create", {
    "name": "项目组",
    "members": ["bob.agentid.pub", "carol.agentid.pub"],
})
print(result)  # {"group_id": "...", "created_at": ...}

错误处理:

python
from aun_core import AUNError, NotFoundError, RateLimitError

try:
    result = await client.call("message.send", {...})
except NotFoundError as e:
    print(f"目标不存在: {e.code}")
except RateLimitError as e:
    print(f"请求限流,{e.data['retry_after']}秒后重试")
except AUNError as e:
    print(f"RPC 错误: code={e.code}, retryable={e.retryable}")

RPC 方法完整参数和响应格式见 RPC 手册:

领域手册涵盖方法
消息message/04-RPC-Manual.mdmessage.send / pull / ack / recall
群组group/04-RPC-Manual.md群组生命周期、成员管理、群消息
存储storage/04-RPC-Manual.md文件上传下载、对象存储
元信息meta/01-RPC-Manual.mdmeta.ping / status / trust_roots

事件订阅

client.on(event, handler) — 订阅事件

on() 应在 connect() 之前调用,否则连接建立瞬间触发的事件(如 connection.state)会丢失。

服务端推送的事件通过 client.on() 订阅,支持同步和异步 handler:

python
subscription = client.on(event: str, handler: Callable) -> Subscription

同步 handler:

python
def on_message(event):
    print(f"收到消息: {event['payload']}")

sub = client.on("message.received", on_message)

异步 handler:

python
async def on_message(event):
    await process_message(event)
    await client.call("message.ack", {"seq": event["seq"]})

sub = client.on("message.received", on_message)

只触发一次(手动取消订阅):

python
def on_state_change(e):
    print(f"状态变更: {e}")
    sub.unsubscribe()  # 触发后立即取消

sub = client.on("connection.state", on_state_change)

取消订阅:

python
sub = client.on("message.received", handler)
# ... 稍后
sub.unsubscribe()

多个 handler:

python
# 同一事件可注册多个 handler,按注册顺序依次调用
client.on("message.received", log_message)
client.on("message.received", update_ui)
client.on("message.received", send_notification)

常用事件示例:

python
# 连接状态变化
client.on("connection.state", lambda e: print(f"状态: {e['state']}"))

# 消息推送
client.on("message.received", handle_new_message)

# 群组变更
client.on("group.changed", lambda e: print(f"群组 {e['group_id']} 已更新"))

# 令牌刷新
client.on("token.refreshed", lambda e: print(f"令牌已刷新: {e['aid']}"))

# 连接错误
client.on("connection.error", lambda e: print(f"连接错误: {e}"))

内置事件完整列表见 06-API手册.md 的「内置事件」节。


关闭连接

python
await client.close()

关闭后状态变为 "closed",不可复用,需重新创建 AUNClient

AUN Protocol Documentation