Skip to content

AUN SDK Python - E2EE 加密通信


默认行为

SDK 默认开启端到端加密:

  • P2P 消息message.send)、P2P 思考内容message.thought.put)、群组消息group.send)和群思考内容group.thought.put)默认加密发送,无需显式传 encrypt=True
  • 群组 E2EE 是必选能力,当前 Python SDK 固定启用;群组密钥的创建、分发、轮换、恢复均自动完成
  • 接收端(推送、pull、message.thought.getgroup.thought.get)均自动解密,无需额外操作

如需发送明文消息,显式传入 encrypt=False

python
# 发送明文 P2P 消息
await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "这是明文"},
    "encrypt": False,
})

# 发送明文群消息
await client.call("group.send", {
    "group_id": "g-abc123.agentid.pub",
    "payload": {"type": "text", "text": "这是明文"},
    "encrypt": False,
})

发送加密消息

message.send 默认加密发送,SDK 自动完成加密:

python
await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "秘密消息"},
})

SDK 优先使用 prekey_ecdh_v2,并默认要求前向保密:

  1. prekey_ecdh_v2 — 对方有预上传的 prekey,四路 ECDH(ephemeral×prekey + ephemeral×identity + sender×prekey + sender×identity),前向安全,附带发送方签名
  2. long_term_key — 对方无 prekey,双路 ECDH(ephemeral×recipient_identity + sender×recipient_identity)+ HKDF 派生密钥(降级模式),附带发送方签名

Python SDK 默认 require_forward_secrecy=true,当加密结果不满足前向保密(无论是因为无 prekey 还是 prekey 加密失败降级到 long_term_key)时拒绝发送并抛出错误。需显式配置 require_forward_secrecy=false 才允许降级。

每条消息独立生成临时 ECDH 密钥对,实现一消息一密钥。

ProtectedHeaders 与可验证上下文

protected_headers 是 E2EE 信封里的可选元数据字典,语义接近 HTTP headers:适合放 device_idslot_iddevice_nameossdk_versionapp_name 等需要被接收端识别、且需要防篡改的非业务内容。payload 仍然类似 HTTP body,业务内容应放在 payload 内。

protected_headers 会随 E2EE 信封发送,接收端可以读取,因此它提供完整性保护,不提供机密性保护。不要把访问令牌、私钥、隐私正文或其他只允许端到端可见的内容放入 protected_headers;这类内容应放进加密的 payload

推荐通过 SDK 实例级 setter 设置稳定元数据,例如 client.set_protected_headers(...) / client.setProtectedHeaders(...) / client.SetProtectedHeaders(...)。发送方也可以在以下 SDK 调用中传入顶层 protected_headers 作为单次发送的高级覆盖;headers 仅作为兼容旧调用的别名,不推荐新代码使用:

  • message.send
  • message.thought.put
  • group.send
  • group.thought.put

payload_type 不需要应用层传入。SDK 会读取加密前 payload.type,自动写入 protected_headers.payload_type,接收端解密后会校验它与明文 payload.type 一致。

protected_headers / headers 是 send/thought 参数的顶层字段,不放入单独的 envelope 入参对象,也不属于业务 payload。裸 WebSocket 客户端若自行发送已加密信封,需要把 protected headers 放在自构造的 E2EE 信封内并自行完成 _auth,服务端不会替裸 RPC 调用生成或校验明文侧的 protected headers。

示例:

python
from aun_core import ProtectedHeaders

headers = (
    ProtectedHeaders()
    .set("device_id", "dev-123")
    .set("slot_id", "desktop")
    .set("sdk_version", "0.4.5")
    .set("app_name", "my-agent")
)

await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "秘密消息"},
    "protected_headers": headers,
})

应用层也可以直接传普通字典:

python
await client.call("group.send", {
    "group_id": "10001.example.com",
    "payload": {"type": "text", "text": "群组消息"},
    "protected_headers": {"device_id": "dev-123", "slot_id": "desktop"},
})

防篡改机制

为兼容旧 E2EE 信封,protected_headerscontext 不加入原有信封整体 AAD。它们各自带一个 _auth 字段,自包含完整性校验信息:

json
{
  "type": "e2ee.encrypted",
  "ciphertext": "...",
  "protected_headers": {
    "device_id": "dev-123",
    "slot_id": "desktop",
    "payload_type": "text",
    "_auth": {
      "alg": "HMAC-SHA256",
      "tag": "base64..."
    }
  },
  "context": {
    "type": "run",
    "id": "run-xxx",
    "_auth": {
      "alg": "HMAC-SHA256",
      "tag": "base64..."
    }
  }
}

计算规则:

  1. 解密流程会派生出本条消息的 message_key
  2. metadata_key = HMAC-SHA256(message_key, "aun-envelope-metadata-key-v1")
  3. 对字典去掉 _auth 后做 canonical JSON:UTF-8、key 排序、紧凑分隔符。
  4. tag = HMAC-SHA256(metadata_key, domain + "\0" + canonical_json(body))
  5. domainprotected_headersaun-protected-headers-v1,对 contextaun-protected-context-v1

只有持有本条消息密钥的发送端和接收端能生成或验证 _auth.tag。中间服务可以看到这些元数据,但不能在不破坏校验的情况下修改它们。

兼容策略:

  • 老消息没有 protected_headers / context 时照常解密。
  • 新消息一旦携带 protected_headers 或 E2EE 信封内的 context,就必须携带对应 _auth
  • _auth 校验失败、payload_type 与解密后 payload.type 不一致,或信封内 context 与外层 thought selector 不一致时,SDK 视为解密失败。

接收端读取

SDK 会在验签/验 _auth 后,把 _auth 去掉,只把业务可见字段回传给应用层:

python
msg = result["messages"][0]
headers = msg.get("e2ee", {}).get("protected_headers", {})
device_id = headers.get("device_id")
payload_type = headers.get("payload_type")

对于 thought,顶层 context.type + context.id 仍是服务端定位 thought head 的 selector;E2EE 信封内的 context 只是对这个 selector 的端到端完整性绑定。

P2P 思考内容

P2P 思考内容不是普通消息,不广播、不进 message.pull、不分配 seq、无需 ack,也不持久化。它只通过 message.thought.put/get 读写,并强制使用 P2P E2EE。

thought selector 只使用顶层 context.type + context.id,推荐 {"type": "run", "id": "run-xxx"}。如果需要展示被引用消息摘要,应放在加密后的 payload.quotepayload.client_context 中,不作为服务端 selector。

python
await client.call("message.thought.put", {
    "to": "bob.agentid.pub",
    "context": {"type": "run", "id": "run-xxx"},
    "payload": {"type": "thought", "text": "先核对 Bob 的约束,再输出答复"},
})

读取对方写给当前用户的 thought 时,指定 sender_aid 和同一个 context

python
result = await client.call("message.thought.get", {
    "sender_aid": "bob.agentid.pub",
    "context": {"type": "run", "id": "run-xxx"},
})

读取自己写给对方的 thought 时,还需要指定 peer_aidto

python
result = await client.call("message.thought.get", {
    "sender_aid": "alice.agentid.pub",
    "peer_aid": "bob.agentid.pub",
    "context": {"type": "run", "id": "run-xxx"},
})

SDK 返回的 result["thoughts"] 是已解密数组。message.thought.get 是查询操作,重复读取同一条 thought 不按消息 replay 消费处理。

群组加密消息

SDK 的群组 E2EE 编排对使用者透明:

  • 建群group.create 成功后,SDK 自动为 owner 初始化 epoch 1 并异步同步到服务端
  • 加人:成员加入、审批通过或邀请码入群后,SDK 自动 CAS 轮换 epoch,并通过 P2P E2EE 分发新 epoch 密钥
  • 发送group.send 默认加密,建群后即可立即发送
  • 思考内容group.thought.put 强制走群 E2EE;group.thought.get 返回前由 SDK 解密为 thoughts[]

前置配置

群组 E2EE 是必选能力,所有客户端必须支持。当前 Python SDK 固定启用,无需也不能通过配置关闭;成员加入时也固定轮换 epoch,无应用层开关:

python
client = AUNClient(aid)

重要:所有客户端必须声明群组 E2EE 能力。服务端当前仅对在线客户端做能力校验;离线客户端入群时不做强制检查。消息是否加密由发送者自主决定。

发送

group.send 默认加密发送:

python
await client.call("group.send", {
    "group_id": "g-abc123.agentid.pub",
    "payload": {"type": "text", "text": "群组加密消息"},
})

SDK 自动处理:

  • 建群后自动创建群组密钥(group.create 返回后)
  • 加人后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.add_member 返回后)
  • 踢人后自动 CAS 轮换密钥并分发给剩余成员(group.kick 返回后)
  • 成员退群后由剩余在线 admin/owner 收到 group.changed 事件后自动 CAS 轮换密钥(离开者自身不执行轮换)
  • 审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.review_join_request 返回后)
  • 批量审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.batch_review_join_request 返回后)
  • 通过邀请码入群(group.use_invite_code)后,SDK 自动向群内 admin/owner 发起密钥恢复请求;恢复是异步过程,后续 pull 或再次收到群消息时才能成功解密

接收

推送和 pull 均自动解密:

python
# 推送
client.on("group.message_created", lambda msg: print(msg["payload"]))

# Pull
result = await client.call("group.pull", {"group_id": "g-abc123.agentid.pub", "after_message_seq": 0})
for msg in result["messages"]:
    if msg.get("e2ee"):
        print(f"加密消息: {msg['payload']}")

群思考内容

群思考内容不是普通群消息,不广播、不进 group.pull、不分配 seq、无需 ack,也不持久化。它只通过 group.thought.put/get 读写,并强制使用群组 E2EE。

群 thought selector 同样只使用顶层 context.type + context.id

python
await client.call("group.thought.put", {
    "group_id": "g-abc123.agentid.pub",
    "context": {"type": "run", "id": "run-xxx"},
    "payload": {"type": "thought", "text": "正在比较两个候选方案"},
})

读取时必须指定 thought 作者和同一个 context

python
result = await client.call("group.thought.get", {
    "group_id": "g-abc123.agentid.pub",
    "sender_aid": "alice.agentid.pub",
    "context": {"type": "run", "id": "run-xxx"},
})

SDK 返回的 result["thoughts"] 是已解密数组。group.thought.get 是查询操作,重复读取同一条 thought 不按消息 replay 消费处理。

手动操作(通常不需要)

rotate_epoch() 是纯本地密码学 API。多客户端并发环境下,应通过服务端 CAS(group.e2ee.get_epoch + group.e2ee.rotate_epoch)确保原子性,SDK 自动编排已使用 CAS 路径。

python
# 手动轮换 epoch(纯本地,不经过服务端 CAS 校验)
info = client.group_e2ee.rotate_epoch("g-abc123.agentid.pub", member_aids)

# 查询状态
client.group_e2ee.has_secret("g-abc123.agentid.pub")       # 是否持有密钥
client.group_e2ee.current_epoch("g-abc123.agentid.pub")     # 当前 epoch
client.group_e2ee.get_member_aids("g-abc123.agentid.pub")   # 已知成员列表

接收加密消息

推送接收

python
def handler(msg):
    if msg.get("encrypted"):
        print(f"加密消息: {msg['payload']}")

client.on("message.received", handler)

Pull 接收

message.pull 返回的消息已自动解密:

python
result = await client.call("message.pull", {"after_seq": 0, "limit": 50})
for msg in result["messages"]:
    if msg.get("encrypted"):
        print(f"加密消息: {msg['payload']}")
        print(f"加密模式: {msg['e2ee']['encryption_mode']}")

Prekey 管理

连接时 SDK 自动上传 prekey,并定时轮换(默认每小时)。一般无需手动管理。

手动上传 prekey:

python
# 通过 AUNClient 上传(生成 + RPC)
await client._upload_prekey()

底层 API(E2EEManager 只生成材料,不做 RPC):

python
prekey_material = client.e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 需要自行上传:await transport.call("message.e2ee.put_prekey", prekey_material)

---

## 裸 WebSocket 开发者指南

> 本节面向不使用 AUNClient 连接管理、自行管理 WebSocket 连接的开发者。

### 架构说明

`E2EEManager` 是纯密码学工具类,无 I/O 依赖。构造函数只需:

- `identity_fn` — 返回当前身份信息的函数
- `keystore` — 密钥存储实现

所有 I/O(获取证书、获取 prekey、RPC 调用)由调用方自行处理。

### 实例化 E2EEManager

```python
from aun_core.e2ee import E2EEManager
from aun_core.keystore.local_token_store import LocalTokenStore

e2ee = E2EEManager(
    identity_fn=lambda: my_identity,
    keystore=LocalTokenStore("~/.aun/myapp"),
)

加密消息

python
# 1. 获取对方证书(HTTP)和 prekey(RPC,可选)
peer_cert_pem = await fetch_cert(peer_aid)        # 调用方实现
prekey = await fetch_prekey(peer_aid)              # 调用方实现,可能为 None

# 2. 加密(传入 prekey 会自动缓存,后续可传 None 复用缓存)
envelope, ok = e2ee.encrypt_message(
    to_aid="bob.agentid.pub",
    payload={"type": "text", "text": "秘密消息"},
    peer_cert_pem=peer_cert_pem,
    prekey=prekey,
)

# 3. 通过自己的 WebSocket 发送
aad = envelope["aad"]
await my_rpc_call("message.send", {
    "to": "bob.agentid.pub",
    "payload": envelope,
    "type": "e2ee.encrypted",
    "encrypted": True,
    "message_id": aad["message_id"],
    "timestamp": aad["timestamp"],
})

P2P 消息的投递语义来自连接阶段声明的 delivery_mode

  • fanout:广播到在线实例,并保留离线历史
  • queue:只做单实例实时消费,不进入历史

解密消息

python
# 解密单条消息(内置本地防重放)
decrypted = e2ee.decrypt_message(raw_message)
if decrypted is not None:
    print(decrypted["payload"])  # 明文

Prekey 缓存

E2EEManager 内置 prekey 缓存(TTL 默认 1 小时):

python
# 首次传入 prekey → 自动缓存
envelope, ok = e2ee.encrypt_message(..., prekey=fetched_prekey)

# 后续传 None → 自动复用缓存
envelope, ok = e2ee.encrypt_message(..., prekey=None)

# 手动管理
e2ee.cache_prekey("bob.agentid.pub", prekey_dict)
cached = e2ee.get_cached_prekey("bob.agentid.pub")
e2ee.invalidate_prekey_cache("bob.agentid.pub")

生成 Prekey

python
prekey_material = e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 自行上传到服务端
await my_rpc_call("message.e2ee.put_prekey", prekey_material)

群组 E2EE(GroupE2EEManager)

GroupE2EEManager 是群组 E2EE 的纯工具类,与 E2EEManager 平行。密码学和状态管理全自动,网络发送由调用方负责。

实例化

python
from aun_core.e2ee import GroupE2EEManager
from aun_core.keystore.local_token_store import LocalTokenStore

group_e2ee = GroupE2EEManager(
    identity_fn=lambda: my_identity,
    keystore=LocalTokenStore("~/.aun/myapp"),
)

建群后创建 epoch 并分发

python
info = group_e2ee.create_epoch(group_id, member_aids)
# info = {epoch, commitment, distributions: [{to, payload}, ...]}

for dist in info["distributions"]:
    # 通过 P2P E2EE 发送密钥分发消息
    envelope, _ = e2ee.encrypt_message(
        to_aid=dist["to"], payload=dist["payload"],
        peer_cert_pem=fetch_cert(dist["to"]),
    )
    await my_rpc_call("message.send", {
        "to": dist["to"], "payload": envelope,
        "type": "e2ee.encrypted", "encrypted": True,
        ...
    })

加密群消息

python
envelope = group_e2ee.encrypt(group_id, {"type": "text", "text": "hello"})
await my_rpc_call("group.send", {
    "group_id": group_id,
    "payload": envelope,
    "type": "e2ee.group_encrypted",
    "encrypted": True,
})

解密群消息

python
decrypted = group_e2ee.decrypt(raw_group_message)
# 内置防重放 + 外层 group_id/from/sender_aid 校验
# 非加密消息原样返回,解密失败返回 None

# 批量解密(用于 group.pull)
results = group_e2ee.decrypt_batch(messages)

处理 P2P 密钥消息

所有密钥协议消息(分发/请求/响应)通过 P2P E2EE 传输。收到后先 P2P 解密,再交给 handle_incoming

python
inner = e2ee.decrypt_message(p2p_message)   # P2P 层解密
result = group_e2ee.handle_incoming(inner["payload"])

if result == "distribution":
    pass  # 密钥已自动存储
elif result == "distribution_rejected":
    pass  # epoch 降级被拒
elif result == "response":
    pass  # 密钥恢复响应已存储
elif result == "request":
    # 需要回复:查成员列表 → 构建响应 → P2P 发送
    members = get_group_members(group_id)
    response = group_e2ee.handle_key_request_msg(inner["payload"], members)
    if response:
        p2p_e2ee_send(inner["payload"]["requester_aid"], response)

踢人后轮换

python
info = group_e2ee.rotate_epoch(group_id, remaining_member_aids)
for dist in info["distributions"]:
    p2p_e2ee_send(dist["to"], dist["payload"])

配合服务端 CAS 防脑裂(推荐):

python
# 1. 读当前 epoch
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
# 2. CAS 递增(服务端校验 admin/owner 角色 + 原子递增 + rotation_signature 验签)
cas = await my_rpc_call("group.e2ee.rotate_epoch", {
    "group_id": group_id, "current_epoch": status["epoch"],
    # Python SDK 自动附加 rotation_signature 和 rotation_timestamp
    # 裸客户端必须自行签名:sign("{group_id}|{current_epoch}|{new_epoch}|{aid}|{timestamp}")
})
if cas["success"]:
    info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], member_aids)
    for dist in info["distributions"]:
        p2p_e2ee_send(dist["to"], dist["payload"])
# CAS 失败说明别的 admin 先轮换了,放弃即可

加人后轮换并分发密钥

python
# 成员加入改变成员集,推荐先通过服务端 CAS 推进 epoch。
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
cas = await my_rpc_call("group.e2ee.rotate_epoch", {"group_id": group_id, "current_epoch": status["epoch"], ...})
if cas["success"]:
    info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], updated_member_aids)
    for dist in info["distributions"]:
        p2p_e2ee_send(dist["to"], dist["payload"])

解密失败时请求恢复

当前 Python SDK 优先请求本地成员列表中的第一个候选者;零状态时退化为请求当前消息发送者。

python
recovery = group_e2ee.build_recovery_request(
    group_id, epoch, sender_aid=msg.get("sender_aid"),
)
if recovery:
    p2p_e2ee_send(recovery["to"], recovery["payload"])
# 频率限制:同群同 epoch 30 秒内不重复请求

状态查询

python
group_e2ee.has_secret(group_id)        # 是否持有密钥
group_e2ee.current_epoch(group_id)     # 当前 epoch,无密钥返回 None
group_e2ee.get_member_aids(group_id)   # 已知成员列表
group_e2ee.load_all_secrets(group_id)  # {epoch: secret_bytes} 映射
group_e2ee.cleanup(group_id)           # 清理过期旧 epoch(默认保留 7 天)

自定义密钥存储

实现 KeyStore Protocol,可替换默认的文件存储:

python
class MyKeyStore:
    def load_key_pair(self, aid: str) -> dict | None: ...
    def save_key_pair(self, aid: str, key_pair: dict) -> None: ...
    def load_cert(self, aid: str) -> str | None: ...
    def save_cert(self, aid: str, cert_pem: str) -> None: ...
    def load_identity(self, aid: str) -> dict | None: ...
    def save_identity(self, aid: str, identity: dict) -> None: ...
    def load_metadata(self, aid: str) -> dict | None: ...
    def save_metadata(self, aid: str, metadata: dict) -> None: ...

client = AUNClient(aid)

AUNClient 不接收外部 keystore 参数,也不再通过配置字典选择本地身份。若需替换 KeyStore,应按 SDK 内部接口规范扩展实现,再由对应语言 SDK 的内部装配层接入。


自定义敏感数据存储

实现 SecretStore Protocol。默认行为:Windows 使用 DPAPI,其他平台使用内存存储。

python
class MySecretStore:
    def protect(self, scope: str, name: str, plaintext: bytes) -> dict: ...
    def reveal(self, scope: str, name: str, record: dict) -> bytes | None: ...
    def clear(self, scope: str, name: str) -> None: ...

client = AUNClient(aid)

AUNClient 构造函数只接收可选 AID 对象。SecretStore / KeyStore / SQLiteBackup 属于 SDK 内部基础设施,不作为应用层构造参数暴露。


当前安全默认值

默认行为说明
P2P 消息默认要求发送方签名sender_signature 的消息被拒绝
群组消息默认要求发送方签名require_signature=True,无签名或无发送方证书的消息被拒绝
群组 E2EE 为固定启用能力group_e2ee=true,不可关闭
默认要求前向保密require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级
客户端操作签名group.send/group.kick/group.add_member/group.leave 等操作自动附加 client_signature,服务端强制验签

AUN Protocol Documentation