AUN SDK Python - WebSocket 协议
本章介绍 AUN Gateway 的 WebSocket 协议细节。掌握这些内容后,可以用任何语言(或直接用 websockets 库)实现客户端,不依赖 SDK 的高层 API。
连接握手
(1) challenge — 服务器发送挑战
连接建立后,Gateway 立即发送:
json
{
"jsonrpc": "2.0",
"method": "challenge",
"params": {
"nonce": "随机字符串(防重放)",
"server": "kite-gateway",
"protocol": { "min": "1.0", "max": "1.0" },
"auth_methods": ["pairing_code", "kite_token", "aid"],
"capabilities": { ... },
"server_time": 1774878000.123
}
}(2) auth.connect — 客户端认证
客户端回复,携带 token 和从 challenge 中获取的 nonce:
json
{
"jsonrpc": "2.0",
"id": "rpc-xxxx",
"method": "auth.connect",
"params": {
"nonce": "从 challenge 中获取",
"auth": {
"method": "kite_token",
"token": "authenticate() 返回的 access_token"
},
"protocol": { "min": "1.0", "max": "1.0" },
"device": { "id": "来自 ~/.aun/.device_id", "type": "desktop" },
"client": { "slot_id": "slot-a" },
"delivery_mode": {
"mode": "queue",
"routing": "sender_affinity",
"affinity_ttl_ms": 300000
},
"options": { "kind": "long" },
"capabilities": {
"e2ee": true,
"group_e2ee": true
}
}
}说明:
protocol.min/max在auth.connect阶段完成 Gateway 会话版本协商;详细规则见协议文档03-Gateway-连接模式.md。device.id是设备级稳定标识,Python SDK 默认从~/.aun/.device_id读取。client.slot_id由应用层显式传入,用于区分同设备上的多个实例槽位。SDK 允许/、:、空格作为隔离键分隔符,例如evolclaw cli、evolclaw/cli、evolclaw:cli的隔离键都是evolclaw。delivery_mode决定该 AID 当前连接的投递语义;同一 AID 的所有在线连接必须保持一致。options.kind声明连接类型:"long"(默认)= 长连接,承担服务端推送 / 事件订阅;"short"= 短连接,仅用于发送 RPC 并等待响应即断开。同(aid, device.id, slotIsolationKey(client.slot_id))隔离槽下,长连接最多 1 条,短连接最多 10 条;短连接不会顶掉长连接。options.short_ttl_ms仅在kind="short"时有效,可选;服务端兜底超时后主动关闭短连接,防止占名额。capabilities是客户端能力声明;hello-ok.result.capabilities是服务端能力公告,不是双方能力交集。
(3) hello-ok — 握手完成
认证成功后 Gateway 返回:
json
{
"jsonrpc": "2.0",
"id": "rpc-xxxx",
"result": {
"status": "ok",
"protocol": "1.0",
"server_time": 1774878000.456,
"authenticated": true,
"identity": {
"module_id": "gateway-client-xxxx",
"role": "agent",
"aid": "alice1234.agentid.pub"
},
"connection": {
"id": "conn_gateway-client-xxxx",
"device_id": "dev-001",
"kind": "long"
},
"capabilities": { ... }
}
}消息格式
所有消息遵循 JSON-RPC 2.0 格式,通过以下规则区分类型:
| 类型 | 特征 | 示例 |
|---|---|---|
| RPC 请求 | 有 id + method | {"id": "rpc-1", "method": "message.send", "params": {...}} |
| RPC 响应 | 有 id + result/error | {"id": "rpc-1", "result": {...}} |
| 事件通知 | 无 id,method 以 event/ 开头 | {"method": "event/message.received", "params": {...}} |
RPC 请求
json
{
"jsonrpc": "2.0",
"id": "rpc-随机hex",
"method": "message.send",
"params": {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "Hello"}
}
}RPC 响应
成功:
json
{
"jsonrpc": "2.0",
"id": "rpc-随机hex",
"result": {
"message_id": "uuid",
"seq": 1,
"status": "delivered"
}
}错误:
json
{
"jsonrpc": "2.0",
"id": "rpc-随机hex",
"error": {
"code": -32603,
"message": "错误描述"
}
}事件通知
服务器推送,无 id 字段:
json
{
"jsonrpc": "2.0",
"method": "event/message.received",
"params": {
"from": "alice.agentid.pub",
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "Hello"}
}
}完整示例
以下示例仅用 SDK 完成认证,其余所有操作(连接、握手、RPC 调用、事件接收)全部通过裸 WebSocket 实现:
python
import asyncio, json, random, secrets
from datetime import datetime
from aun_core import AIDStore, AUNClient
from aun_core.errors import AUNError, ConnectionError, AuthError
import websockets
DOMAIN = "agentid.pub"
ALICE = f"alice{random.randint(1000,9999)}.{DOMAIN}"
BOB = f"bob{random.randint(1000,9999)}.{DOMAIN}"
def ts():
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
def make_rpc(method: str, params: dict) -> tuple[str, str]:
"""构造 JSON-RPC 2.0 请求,返回 (rpc_id, json_str)"""
rpc_id = f"rpc-{secrets.token_hex(4)}"
msg = {"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params}
return rpc_id, json.dumps(msg)
async def authenticate(aid: str) -> dict:
"""用 SDK 完成 AID 注册/加载和认证,返回 access_token + gateway。"""
try:
store = AIDStore(aun_path="~/.aun/ws-demo", encryption_seed="")
loaded = store.load(aid)
if not loaded["ok"]:
registered = await store.register(aid)
if not registered["ok"]:
raise AuthError(registered["error"]["message"])
loaded = store.load(aid)
client = AUNClient(loaded["data"]["aid"])
return await client.authenticate()
except AuthError as e:
print(f"[错误] 认证失败 ({aid}): {e}")
raise
except ConnectionError as e:
print(f"[错误] 网络连接失败: {e}")
raise
async def ws_connect(gateway_url: str, token: str) -> websockets.ClientConnection:
"""建立 WebSocket 连接并完成握手(challenge → auth.connect → hello-ok)"""
ws = await websockets.connect(gateway_url, ping_interval=None)
# 1. 接收 challenge
raw = await ws.recv()
challenge = json.loads(raw)
nonce = challenge["params"]["nonce"]
# 2. 发送 auth.connect
rpc_id, req = make_rpc("auth.connect", {
"nonce": nonce,
"auth": {"method": "kite_token", "token": token},
"protocol": {"min": "1.0", "max": "1.0"},
})
await ws.send(req)
# 3. 接收 hello-ok
raw = await ws.recv()
hello = json.loads(raw)
if "error" in hello:
raise Exception(f"握手失败: {hello['error']}")
identity = hello["result"]["identity"]
print(f"[{ts()}] 已连接: {identity.get('aid')} (module={identity.get('module_id')})")
return ws
async def ws_call(ws, method: str, params: dict, timeout: float = 10.0) -> dict:
"""发送 RPC 调用并等待响应(从消息流中匹配 id)"""
rpc_id, req = make_rpc(method, params)
await ws.send(req)
deadline = asyncio.get_event_loop().time() + timeout
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
raise TimeoutError(f"RPC 超时: {method}")
raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
msg = json.loads(raw)
if msg.get("id") == rpc_id:
if "error" in msg:
raise Exception(f"RPC 错误: {msg['error']}")
return msg.get("result", {})
async def ws_wait_event(ws, event_name: str, timeout: float = 5.0) -> dict | None:
"""等待指定事件(method 字段以 event/ 为前缀)"""
target = f"event/{event_name}"
deadline = asyncio.get_event_loop().time() + timeout
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
return None
try:
raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
except asyncio.TimeoutError:
return None
msg = json.loads(raw)
if msg.get("method") == target:
return msg.get("params", {})
async def main():
alice_ws = None
bob_ws = None
try:
# ── 第一阶段:用 SDK 认证,拿到 token + gateway ──
alice_auth = await authenticate(ALICE)
bob_auth = await authenticate(BOB)
# ── 第二阶段:裸 WebSocket 连接(使用 auth 返回的 gateway)──
try:
alice_ws = await ws_connect(alice_auth["gateway"], alice_auth["access_token"])
bob_ws = await ws_connect(bob_auth["gateway"], bob_auth["access_token"])
except Exception as e:
print(f"[错误] WebSocket 连接失败: {e}")
raise
# ── 第三阶段:Alice 发消息 ──
try:
result = await ws_call(alice_ws, "message.send", {
"to": BOB,
"payload": {"type": "text", "text": "Hello from Alice (raw WebSocket)!"},
})
print(f"[{ts()}] [Alice 发送] status={result.get('status')}, seq={result.get('seq')}")
except TimeoutError as e:
print(f"[错误] RPC 调用超时: {e}")
except Exception as e:
print(f"[错误] 发送消息失败: {e}")
# ── 第四阶段:Bob 等待事件推送 ──
event = await ws_wait_event(bob_ws, "message.received", timeout=5.0)
if event:
print(f"[{ts()}] [Bob 收到事件] {event.get('payload')}")
else:
# 事件未推送,主动拉取
try:
pull = await ws_call(bob_ws, "message.pull", {"after_seq": 0, "limit": 10})
msgs = pull.get("messages", [])
if msgs:
print(f"[{ts()}] [Bob 拉取] 收到 {len(msgs)} 条消息:")
for m in msgs:
print(f" {m.get('payload')}")
else:
print(f"[{ts()}] [Bob] 未收到消息")
except Exception as e:
print(f"[错误] 拉取消息失败: {e}")
print(f"[{ts()}] 完成")
except KeyboardInterrupt:
print(f"\n[{ts()}] 用户中断")
except Exception as e:
print(f"[{ts()}] 程序异常: {e}")
raise
finally:
# ── 关闭 WebSocket ──
if alice_ws:
try:
await alice_ws.close()
except Exception:
pass
if bob_ws:
try:
await bob_ws.close()
except Exception:
pass
asyncio.run(main())
