Skip to content

AUN SDK Python - E2EE 加密通信


默认行为

SDK 默认开启端到端加密:

  • P2P 消息message.send)和群组消息group.send)默认加密发送,无需显式传 encrypt=True
  • 群组 E2EE 是必选能力,当前 Python SDK 固定启用;群组密钥的创建、分发、轮换、恢复均自动完成
  • 接收端(推送和 pull)均自动解密,无需额外操作

如需发送明文消息,显式传入 encrypt=False

python
# 发送明文 P2P 消息
await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "这是明文"},
    "encrypt": False,
})

# 发送明文群消息
await client.call("group.send", {
    "group_id": "g-abc123.agentid.pub",
    "payload": {"type": "text", "text": "这是明文"},
    "encrypt": False,
})

发送加密消息

message.send 默认加密发送,SDK 自动完成加密:

python
await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "秘密消息"},
})

SDK 优先使用 prekey_ecdh_v2,并默认要求前向保密:

  1. prekey_ecdh_v2 — 对方有预上传的 prekey,四路 ECDH(ephemeral×prekey + ephemeral×identity + sender×prekey + sender×identity),前向安全,附带发送方签名
  2. long_term_key — 对方无 prekey,双路 ECDH(ephemeral×recipient_identity + sender×recipient_identity)+ HKDF 派生密钥(降级模式),附带发送方签名

Python SDK 默认 require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级并抛出错误。需显式配置 require_forward_secrecy=false 才允许降级。

每条消息独立生成临时 ECDH 密钥对,实现一消息一密钥。

群组加密消息

SDK 的群组 E2EE 编排对使用者透明:

  • 建群group.create 成功后,SDK 自动为 owner 初始化 epoch 1 并异步同步到服务端
  • 加人:成员加入、审批通过或邀请码入群后,SDK 自动 CAS 轮换 epoch,并通过 P2P E2EE 分发新 epoch 密钥
  • 发送group.send 默认加密,建群后即可立即发送

前置配置

群组 E2EE 是必选能力,所有客户端必须支持。当前 Python SDK 固定启用,无需也不能通过配置关闭;成员加入时也固定轮换 epoch,无应用层开关:

python
client = AUNClient({})

重要:所有客户端必须声明群组 E2EE 能力。服务端当前仅对在线客户端做能力校验;离线客户端入群时不做强制检查。消息是否加密由发送者自主决定。

发送

group.send 默认加密发送:

python
await client.call("group.send", {
    "group_id": "g-abc123.agentid.pub",
    "payload": {"type": "text", "text": "群组加密消息"},
})

SDK 自动处理:

  • 建群后自动创建群组密钥(group.create 返回后)
  • 加人后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.add_member 返回后)
  • 踢人后自动 CAS 轮换密钥并分发给剩余成员(group.kick 返回后)
  • 成员退群后由剩余在线 admin/owner 收到 group.changed 事件后自动 CAS 轮换密钥(离开者自身不执行轮换)
  • 审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.review_join_request 返回后)
  • 批量审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(group.batch_review_join_request 返回后)
  • 通过邀请码入群(group.use_invite_code)后,SDK 自动向群内 admin/owner 发起密钥恢复请求;恢复是异步过程,后续 pull 或再次收到群消息时才能成功解密

接收

推送和 pull 均自动解密:

python
# 推送
client.on("group.message_created", lambda msg: print(msg["payload"]))

# Pull
result = await client.call("group.pull", {"group_id": "g-abc123.agentid.pub", "after_message_seq": 0})
for msg in result["messages"]:
    if msg.get("e2ee"):
        print(f"加密消息: {msg['payload']}")

手动操作(通常不需要)

rotate_epoch() 是纯本地密码学 API。多客户端并发环境下,应通过服务端 CAS(group.e2ee.get_epoch + group.e2ee.rotate_epoch)确保原子性,SDK 自动编排已使用 CAS 路径。

python
# 手动轮换 epoch(纯本地,不经过服务端 CAS 校验)
info = client.group_e2ee.rotate_epoch("g-abc123.agentid.pub", member_aids)

# 查询状态
client.group_e2ee.has_secret("g-abc123.agentid.pub")       # 是否持有密钥
client.group_e2ee.current_epoch("g-abc123.agentid.pub")     # 当前 epoch
client.group_e2ee.get_member_aids("g-abc123.agentid.pub")   # 已知成员列表

接收加密消息

推送接收

python
def handler(msg):
    if msg.get("encrypted"):
        print(f"加密消息: {msg['payload']}")

client.on("message.received", handler)

Pull 接收

message.pull 返回的消息已自动解密:

python
result = await client.call("message.pull", {"after_seq": 0, "limit": 50})
for msg in result["messages"]:
    if msg.get("encrypted"):
        print(f"加密消息: {msg['payload']}")
        print(f"加密模式: {msg['e2ee']['encryption_mode']}")

Prekey 管理

连接时 SDK 自动上传 prekey,并定时轮换(默认每小时)。一般无需手动管理。

手动上传 prekey:

python
# 通过 AUNClient 上传(生成 + RPC)
await client._upload_prekey()

底层 API(E2EEManager 只生成材料,不做 RPC):

python
prekey_material = client.e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 需要自行上传:await transport.call("message.e2ee.put_prekey", prekey_material)

---

## 裸 WebSocket 开发者指南

> 本节面向不使用 AUNClient 连接管理、自行管理 WebSocket 连接的开发者。

### 架构说明

`E2EEManager` 是纯密码学工具类,无 I/O 依赖。构造函数只需:

- `identity_fn` — 返回当前身份信息的函数
- `keystore` — 密钥存储实现

所有 I/O(获取证书、获取 prekey、RPC 调用)由调用方自行处理。

### 实例化 E2EEManager

```python
from aun_core.e2ee import E2EEManager
from aun_core.keystore.file import FileKeyStore

e2ee = E2EEManager(
    identity_fn=lambda: my_identity,
    keystore=FileKeyStore("~/.aun/myapp"),
)

加密消息

python
# 1. 获取对方证书(HTTP)和 prekey(RPC,可选)
peer_cert_pem = await fetch_cert(peer_aid)        # 调用方实现
prekey = await fetch_prekey(peer_aid)              # 调用方实现,可能为 None

# 2. 加密(传入 prekey 会自动缓存,后续可传 None 复用缓存)
envelope, ok = e2ee.encrypt_message(
    to_aid="bob.agentid.pub",
    payload={"type": "text", "text": "秘密消息"},
    peer_cert_pem=peer_cert_pem,
    prekey=prekey,
)

# 3. 通过自己的 WebSocket 发送
aad = envelope["aad"]
await my_rpc_call("message.send", {
    "to": "bob.agentid.pub",
    "payload": envelope,
    "type": "e2ee.encrypted",
    "encrypted": True,
    "message_id": aad["message_id"],
    "timestamp": aad["timestamp"],
})

P2P 消息的投递语义来自连接阶段声明的 delivery_mode

  • fanout:广播到在线实例,并保留离线历史
  • queue:只做单实例实时消费,不进入历史

解密消息

python
# 解密单条消息(内置本地防重放)
decrypted = e2ee.decrypt_message(raw_message)
if decrypted is not None:
    print(decrypted["payload"])  # 明文

Prekey 缓存

E2EEManager 内置 prekey 缓存(TTL 默认 1 小时):

python
# 首次传入 prekey → 自动缓存
envelope, ok = e2ee.encrypt_message(..., prekey=fetched_prekey)

# 后续传 None → 自动复用缓存
envelope, ok = e2ee.encrypt_message(..., prekey=None)

# 手动管理
e2ee.cache_prekey("bob.agentid.pub", prekey_dict)
cached = e2ee.get_cached_prekey("bob.agentid.pub")
e2ee.invalidate_prekey_cache("bob.agentid.pub")

生成 Prekey

python
prekey_material = e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 自行上传到服务端
await my_rpc_call("message.e2ee.put_prekey", prekey_material)

群组 E2EE(GroupE2EEManager)

GroupE2EEManager 是群组 E2EE 的纯工具类,与 E2EEManager 平行。密码学和状态管理全自动,网络发送由调用方负责。

实例化

python
from aun_core.e2ee import GroupE2EEManager
from aun_core.keystore.file import FileKeyStore

group_e2ee = GroupE2EEManager(
    identity_fn=lambda: my_identity,
    keystore=FileKeyStore("~/.aun/myapp"),
)

建群后创建 epoch 并分发

python
info = group_e2ee.create_epoch(group_id, member_aids)
# info = {epoch, commitment, distributions: [{to, payload}, ...]}

for dist in info["distributions"]:
    # 通过 P2P E2EE 发送密钥分发消息
    envelope, _ = e2ee.encrypt_message(
        to_aid=dist["to"], payload=dist["payload"],
        peer_cert_pem=fetch_cert(dist["to"]),
    )
    await my_rpc_call("message.send", {
        "to": dist["to"], "payload": envelope,
        "type": "e2ee.encrypted", "encrypted": True,
        ...
    })

加密群消息

python
envelope = group_e2ee.encrypt(group_id, {"type": "text", "text": "hello"})
await my_rpc_call("group.send", {
    "group_id": group_id,
    "payload": envelope,
    "type": "e2ee.group_encrypted",
    "encrypted": True,
})

解密群消息

python
decrypted = group_e2ee.decrypt(raw_group_message)
# 内置防重放 + 外层 group_id/from/sender_aid 校验
# 非加密消息原样返回,解密失败返回 None

# 批量解密(用于 group.pull)
results = group_e2ee.decrypt_batch(messages)

处理 P2P 密钥消息

所有密钥协议消息(分发/请求/响应)通过 P2P E2EE 传输。收到后先 P2P 解密,再交给 handle_incoming

python
inner = e2ee.decrypt_message(p2p_message)   # P2P 层解密
result = group_e2ee.handle_incoming(inner["payload"])

if result == "distribution":
    pass  # 密钥已自动存储
elif result == "distribution_rejected":
    pass  # epoch 降级被拒
elif result == "response":
    pass  # 密钥恢复响应已存储
elif result == "request":
    # 需要回复:查成员列表 → 构建响应 → P2P 发送
    members = get_group_members(group_id)
    response = group_e2ee.handle_key_request_msg(inner["payload"], members)
    if response:
        p2p_e2ee_send(inner["payload"]["requester_aid"], response)

踢人后轮换

python
info = group_e2ee.rotate_epoch(group_id, remaining_member_aids)
for dist in info["distributions"]:
    p2p_e2ee_send(dist["to"], dist["payload"])

配合服务端 CAS 防脑裂(推荐):

python
# 1. 读当前 epoch
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
# 2. CAS 递增(服务端校验 admin/owner 角色 + 原子递增 + rotation_signature 验签)
cas = await my_rpc_call("group.e2ee.rotate_epoch", {
    "group_id": group_id, "current_epoch": status["epoch"],
    # Python SDK 自动附加 rotation_signature 和 rotation_timestamp
    # 裸客户端必须自行签名:sign("{group_id}|{current_epoch}|{new_epoch}|{aid}|{timestamp}")
})
if cas["success"]:
    info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], member_aids)
    for dist in info["distributions"]:
        p2p_e2ee_send(dist["to"], dist["payload"])
# CAS 失败说明别的 admin 先轮换了,放弃即可

加人后轮换并分发密钥

python
# 成员加入改变成员集,推荐先通过服务端 CAS 推进 epoch。
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
cas = await my_rpc_call("group.e2ee.rotate_epoch", {"group_id": group_id, "current_epoch": status["epoch"], ...})
if cas["success"]:
    info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], updated_member_aids)
    for dist in info["distributions"]:
        p2p_e2ee_send(dist["to"], dist["payload"])

解密失败时请求恢复

当前 Python SDK 优先请求本地成员列表中的第一个候选者;零状态时退化为请求当前消息发送者。

python
recovery = group_e2ee.build_recovery_request(
    group_id, epoch, sender_aid=msg.get("sender_aid"),
)
if recovery:
    p2p_e2ee_send(recovery["to"], recovery["payload"])
# 频率限制:同群同 epoch 30 秒内不重复请求

状态查询

python
group_e2ee.has_secret(group_id)        # 是否持有密钥
group_e2ee.current_epoch(group_id)     # 当前 epoch,无密钥返回 None
group_e2ee.get_member_aids(group_id)   # 已知成员列表
group_e2ee.load_all_secrets(group_id)  # {epoch: secret_bytes} 映射
group_e2ee.cleanup(group_id)           # 清理过期旧 epoch(默认保留 7 天)

自定义密钥存储

实现 KeyStore Protocol,可替换默认的文件存储:

python
class MyKeyStore:
    def load_key_pair(self, aid: str) -> dict | None: ...
    def save_key_pair(self, aid: str, key_pair: dict) -> None: ...
    def load_cert(self, aid: str) -> str | None: ...
    def save_cert(self, aid: str, cert_pem: str) -> None: ...
    def load_identity(self, aid: str) -> dict | None: ...
    def save_identity(self, aid: str, identity: dict) -> None: ...
    def load_metadata(self, aid: str) -> dict | None: ...
    def save_metadata(self, aid: str, metadata: dict) -> None: ...

client = AUNClient({"aun_path": "..."})

AUNClient 不接收外部 keystore 参数。若需替换 KeyStore,应按 SDK 内部接口规范扩展实现,再由对应语言 SDK 的内部装配层接入。


自定义敏感数据存储

实现 SecretStore Protocol。默认行为:Windows 使用 DPAPI,其他平台使用内存存储。

python
class MySecretStore:
    def protect(self, scope: str, name: str, plaintext: bytes) -> dict: ...
    def reveal(self, scope: str, name: str, record: dict) -> bytes | None: ...
    def clear(self, scope: str, name: str) -> None: ...

client = AUNClient({"aun_path": "..."})

AUNClient 构造函数只保留 aun_pathroot_ca_pathseed_passwordSecretStore / KeyStore / SQLiteBackup 属于 SDK 内部基础设施,不作为应用层构造参数暴露。


当前安全默认值

默认行为说明
P2P 消息默认要求发送方签名sender_signature 的消息被拒绝
群组消息默认要求发送方签名require_signature=True,无签名或无发送方证书的消息被拒绝
群组 E2EE 为固定启用能力group_e2ee=true,不可关闭
默认要求前向保密require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级
客户端操作签名group.send/group.kick/group.add_member/group.leave 等操作自动附加 client_signature,服务端强制验签

AUN Protocol Documentation