Skip to content

AUN SDK Python - WebSocket 协议

本章介绍 AUN Gateway 的 WebSocket 协议细节。掌握这些内容后,可以用任何语言(或直接用 websockets 库)实现客户端,不依赖 SDK 的高层 API。


连接握手

(1) challenge — 服务器发送挑战

连接建立后,Gateway 立即发送:

json
{
    "jsonrpc": "2.0",
    "method": "challenge",
    "params": {
        "nonce": "随机字符串(防重放)",
        "server": "kite-gateway",
        "protocol": { "min": "1.0", "max": "1.0" },
        "auth_methods": ["pairing_code", "kite_token", "aid"],
        "capabilities": { ... },
        "server_time": 1774878000.123
    }
}

(2) auth.connect — 客户端认证

客户端回复,携带 token 和从 challenge 中获取的 nonce:

json
{
    "jsonrpc": "2.0",
    "id": "rpc-xxxx",
    "method": "auth.connect",
    "params": {
        "nonce": "从 challenge 中获取",
        "auth": {
            "method": "kite_token",
            "token": "authenticate() 返回的 access_token"
        },
        "protocol": { "min": "1.0", "max": "1.0" },
        "device": { "id": "来自 ~/.aun/.device_id", "type": "desktop" },
        "client": { "slot_id": "slot-a" },
        "delivery_mode": {
            "mode": "queue",
            "routing": "sender_affinity",
            "affinity_ttl_ms": 300000
        },
        "options": { "kind": "long" },
        "capabilities": {
            "e2ee": true,
            "group_e2ee": true
        }
    }
}

说明:

  • protocol.min/maxauth.connect 阶段完成 Gateway 会话版本协商;详细规则见协议文档 03-Gateway-连接模式.md
  • device.id 是设备级稳定标识,Python SDK 默认从 ~/.aun/.device_id 读取。
  • client.slot_id 由应用层显式传入,用于区分同设备上的多个实例槽位。SDK 允许 /:、空格作为隔离键分隔符,例如 evolclaw clievolclaw/clievolclaw:cli 的隔离键都是 evolclaw
  • delivery_mode 决定该 AID 当前连接的投递语义;同一 AID 的所有在线连接必须保持一致。
  • options.kind 声明连接类型:"long"(默认)= 长连接,承担服务端推送 / 事件订阅;"short" = 短连接,仅用于发送 RPC 并等待响应即断开。同 (aid, device.id, slotIsolationKey(client.slot_id)) 隔离槽下,长连接最多 1 条,短连接最多 10 条;短连接不会顶掉长连接。
  • options.short_ttl_ms 仅在 kind="short" 时有效,可选;服务端兜底超时后主动关闭短连接,防止占名额。
  • capabilities 是客户端能力声明;hello-ok.result.capabilities 是服务端能力公告,不是双方能力交集。

(3) hello-ok — 握手完成

认证成功后 Gateway 返回:

json
{
    "jsonrpc": "2.0",
    "id": "rpc-xxxx",
    "result": {
        "status": "ok",
        "protocol": "1.0",
        "server_time": 1774878000.456,
        "authenticated": true,
        "identity": {
            "module_id": "gateway-client-xxxx",
            "role": "agent",
            "aid": "alice1234.agentid.pub"
        },
        "connection": {
            "id": "conn_gateway-client-xxxx",
            "device_id": "dev-001",
            "kind": "long"
        },
        "capabilities": { ... }
    }
}

消息格式

所有消息遵循 JSON-RPC 2.0 格式,通过以下规则区分类型:

类型特征示例
RPC 请求id + method{"id": "rpc-1", "method": "message.send", "params": {...}}
RPC 响应id + result/error{"id": "rpc-1", "result": {...}}
事件通知idmethodevent/ 开头{"method": "event/message.received", "params": {...}}

RPC 请求

json
{
    "jsonrpc": "2.0",
    "id": "rpc-随机hex",
    "method": "message.send",
    "params": {
        "to": "bob.agentid.pub",
        "payload": {"type": "text", "text": "Hello"}
    }
}

RPC 响应

成功:

json
{
    "jsonrpc": "2.0",
    "id": "rpc-随机hex",
    "result": {
        "message_id": "uuid",
        "seq": 1,
        "status": "delivered"
    }
}

错误:

json
{
    "jsonrpc": "2.0",
    "id": "rpc-随机hex",
    "error": {
        "code": -32603,
        "message": "错误描述"
    }
}

事件通知

服务器推送,无 id 字段:

json
{
    "jsonrpc": "2.0",
    "method": "event/message.received",
    "params": {
        "from": "alice.agentid.pub",
        "to": "bob.agentid.pub",
        "payload": {"type": "text", "text": "Hello"}
    }
}

完整示例

以下示例仅用 SDK 完成认证,其余所有操作(连接、握手、RPC 调用、事件接收)全部通过裸 WebSocket 实现:

python
import asyncio, json, random, secrets
from datetime import datetime
from aun_core import AIDStore, AUNClient
from aun_core.errors import AUNError, ConnectionError, AuthError
import websockets

DOMAIN = "agentid.pub"
ALICE = f"alice{random.randint(1000,9999)}.{DOMAIN}"
BOB = f"bob{random.randint(1000,9999)}.{DOMAIN}"


def ts():
    return datetime.now().strftime("%H:%M:%S.%f")[:-3]


def make_rpc(method: str, params: dict) -> tuple[str, str]:
    """构造 JSON-RPC 2.0 请求,返回 (rpc_id, json_str)"""
    rpc_id = f"rpc-{secrets.token_hex(4)}"
    msg = {"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params}
    return rpc_id, json.dumps(msg)


async def authenticate(aid: str) -> dict:
    """用 SDK 完成 AID 注册/加载和认证,返回 access_token + gateway。"""
    try:
        store = AIDStore(aun_path="~/.aun/ws-demo", encryption_seed="")
        loaded = store.load(aid)
        if not loaded["ok"]:
            registered = await store.register(aid)
            if not registered["ok"]:
                raise AuthError(registered["error"]["message"])
            loaded = store.load(aid)
        client = AUNClient(loaded["data"]["aid"])
        return await client.authenticate()
    except AuthError as e:
        print(f"[错误] 认证失败 ({aid}): {e}")
        raise
    except ConnectionError as e:
        print(f"[错误] 网络连接失败: {e}")
        raise


async def ws_connect(gateway_url: str, token: str) -> websockets.ClientConnection:
    """建立 WebSocket 连接并完成握手(challenge → auth.connect → hello-ok)"""
    ws = await websockets.connect(gateway_url, ping_interval=None)

    # 1. 接收 challenge
    raw = await ws.recv()
    challenge = json.loads(raw)
    nonce = challenge["params"]["nonce"]

    # 2. 发送 auth.connect
    rpc_id, req = make_rpc("auth.connect", {
        "nonce": nonce,
        "auth": {"method": "kite_token", "token": token},
        "protocol": {"min": "1.0", "max": "1.0"},
    })
    await ws.send(req)

    # 3. 接收 hello-ok
    raw = await ws.recv()
    hello = json.loads(raw)
    if "error" in hello:
        raise Exception(f"握手失败: {hello['error']}")
    identity = hello["result"]["identity"]
    print(f"[{ts()}] 已连接: {identity.get('aid')} (module={identity.get('module_id')})")
    return ws


async def ws_call(ws, method: str, params: dict, timeout: float = 10.0) -> dict:
    """发送 RPC 调用并等待响应(从消息流中匹配 id)"""
    rpc_id, req = make_rpc(method, params)
    await ws.send(req)
    deadline = asyncio.get_event_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_event_loop().time()
        if remaining <= 0:
            raise TimeoutError(f"RPC 超时: {method}")
        raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
        msg = json.loads(raw)
        if msg.get("id") == rpc_id:
            if "error" in msg:
                raise Exception(f"RPC 错误: {msg['error']}")
            return msg.get("result", {})


async def ws_wait_event(ws, event_name: str, timeout: float = 5.0) -> dict | None:
    """等待指定事件(method 字段以 event/ 为前缀)"""
    target = f"event/{event_name}"
    deadline = asyncio.get_event_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_event_loop().time()
        if remaining <= 0:
            return None
        try:
            raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
        except asyncio.TimeoutError:
            return None
        msg = json.loads(raw)
        if msg.get("method") == target:
            return msg.get("params", {})


async def main():
    alice_ws = None
    bob_ws = None

    try:
        # ── 第一阶段:用 SDK 认证,拿到 token + gateway ──
        alice_auth = await authenticate(ALICE)
        bob_auth = await authenticate(BOB)

        # ── 第二阶段:裸 WebSocket 连接(使用 auth 返回的 gateway)──
        try:
            alice_ws = await ws_connect(alice_auth["gateway"], alice_auth["access_token"])
            bob_ws = await ws_connect(bob_auth["gateway"], bob_auth["access_token"])
        except Exception as e:
            print(f"[错误] WebSocket 连接失败: {e}")
            raise

        # ── 第三阶段:Alice 发消息 ──
        try:
            result = await ws_call(alice_ws, "message.send", {
                "to": BOB,
                "payload": {"type": "text", "text": "Hello from Alice (raw WebSocket)!"},
            })
            print(f"[{ts()}] [Alice 发送] status={result.get('status')}, seq={result.get('seq')}")
        except TimeoutError as e:
            print(f"[错误] RPC 调用超时: {e}")
        except Exception as e:
            print(f"[错误] 发送消息失败: {e}")

        # ── 第四阶段:Bob 等待事件推送 ──
        event = await ws_wait_event(bob_ws, "message.received", timeout=5.0)
        if event:
            print(f"[{ts()}] [Bob 收到事件] {event.get('payload')}")
        else:
            # 事件未推送,主动拉取
            try:
                pull = await ws_call(bob_ws, "message.pull", {"after_seq": 0, "limit": 10})
                msgs = pull.get("messages", [])
                if msgs:
                    print(f"[{ts()}] [Bob 拉取] 收到 {len(msgs)} 条消息:")
                    for m in msgs:
                        print(f"  {m.get('payload')}")
                else:
                    print(f"[{ts()}] [Bob] 未收到消息")
            except Exception as e:
                print(f"[错误] 拉取消息失败: {e}")

        print(f"[{ts()}] 完成")

    except KeyboardInterrupt:
        print(f"\n[{ts()}] 用户中断")
    except Exception as e:
        print(f"[{ts()}] 程序异常: {e}")
        raise
    finally:
        # ── 关闭 WebSocket ──
        if alice_ws:
            try:
                await alice_ws.close()
            except Exception:
                pass
        if bob_ws:
            try:
                await bob_ws.close()
            except Exception:
                pass


asyncio.run(main())

AUN Protocol Documentation