5.3 Queue contracts
学习目标
完成本节后,你将能够:
- 读懂
QueueName、QueueJobs、payload schema、TQueueJobTypes的关系。 - 明白 queue contract 如何把
web、sharedproducer 和workerconsumer 连接起来。 - 新增 worker job 时考虑 rolling deploy、sharding、secondary queue 和失败语义。
5.3.1 先给结论
队列是跨进程协议,不是函数调用。
在 Langfuse 里,一条 job 的完整 contract 包括:
QueueName -> QueueJobs -> payload Zod schema -> TQueueJobTypes -> queue class -> worker registration -> processor如果只改 processor,不改 shared contract,producer 不知道怎么发;如果只改 producer,不改 worker 注册,job 没人消费;如果不考虑旧 job,新版本部署时会处理不了队列里的老 payload。
5.3.2 queues.ts 的三层结构
源码:packages/shared/src/server/queues.ts
| 层 | 代码形态 | 例子 |
|---|---|---|
| payload schema | z.object(...) 或 z.discriminatedUnion(...) | IngestionEvent、DatasetQueueEventSchema、BatchActionProcessingEventSchema |
| queue/job names | enum QueueName、enum QueueJobs | QueueName.IngestionQueue、QueueJobs.IngestionJob |
| type map | TQueueJobTypes | [QueueName.IngestionQueue]: { timestamp, id, payload, name } |
这个 type map 让 queue class 和 worker processor 可以拿到同一套 typed payload。
5.3.3 以 ingestion queue 为例
Producer
packages/shared/src/server/ingestion/processEventBatch.ts 会:
- 校验 batch;
- 按 entity 分组;
- 写 raw JSON 到 S3/blob;
- 构造 payload;
- 调用
IngestionQueue.getInstance({ shardingKey })?.add(...)。
Contract
IngestionEvent schema 包含:
| 字段 | consumer 如何使用 |
|---|---|
data.type | 映射 ClickHouse entity type。 |
data.eventBodyId | sharding、dedupe、legacy S3 prefix fallback。 |
data.fileKey | 直接下载具体 S3 文件。 |
data.skipS3List | OTel observation 等路径可跳过 S3 list。 |
data.forwardToEventsTable | 控制 v4 events 写入。 |
data.bucketPrefix | producer 写 S3 时的绝对 prefix,避免 consumer 重构路径漂移。 |
authCheck.scope.projectId | tenant scope。 |
Consumer
worker/src/queues/ingestionQueue.ts 会:
- 从 job payload 设置 span attributes;
- 优先使用
bucketPrefix,旧 job fallback 到rawEventBucketPrefix; - 用 Redis seen cache 跳过短时间重复处理;
- 根据 env/S3 SlowDown 标记转发 secondary queue;
- 从 S3 list 或直接下载 raw event;
- 恢复 canonical entity id;
- 调用
IngestionService.mergeAndWrite; - 出错时记录 ingest failure 并 throw 触发 retry。
这说明 queue payload 是 producer 和 consumer 的共同协议。
5.3.4 queue class 和 sharding
源码:packages/shared/src/server/redis/ingestionQueue.ts
队列 class 负责:
- 生成 shard queue names;
- 根据 sharding key 选择 shard;
- 创建 typed BullMQ Queue;
- 维护 singleton queue instance;
- 监听 queue error。
sharding 的目的不是类型好看,而是让高吞吐队列可以横向分摊负载。
5.3.5 worker registration
consumer 不是自动出现的。worker/src/app.ts 会按 env flag 注册:
IngestionQueue.getShardNames()
-> WorkerManager.register(shardName, ingestionQueueProcessorBuilder(true), options)secondary queue 也要单独注册。也就是说:
- queue name 在 shared;
- queue instance 在 shared redis class;
- processor 在 worker;
- 是否消费由 worker app/env 决定。
5.3.6 rolling deploy 兼容
队列里可能存在旧版本 job。新增字段时要考虑四种组合:
| producer | consumer | 风险 |
|---|---|---|
| old producer | new consumer | 新 consumer 不能假设新字段存在。 |
| new producer | old consumer | 新字段不能成为 old consumer 的必需字段。 |
| old job in Redis | new consumer | payload schema 要能 parse 旧 job。 |
| secondary queue forwarded job | consumer | 转发时要保留原 payload。 |
常见策略:
- 新字段先 optional;
- consumer 写 fallback;
- 注释说明兼容窗口;
- 队列耗尽后再收紧;
- schema test 覆盖新旧 payload。
bucketPrefix 就是这类字段:新 producer 写入,new consumer 优先使用;旧 job 没有它时,consumer fallback 到旧 prefix 构造逻辑。
5.3.7 新增 queue 的完整 checklist
| 步骤 | 文件 |
|---|---|
| 定义 payload schema | packages/shared/src/server/queues.ts |
| 增加 queue name/job name | QueueName / QueueJobs |
| 更新 type map | TQueueJobTypes |
| 创建 queue class | packages/shared/src/server/redis/** |
| 导出 queue class | packages/shared/src/server/index.ts |
| 新增 producer | web/shared/worker 里调用 .add(...) |
| 新增 processor | worker/src/queues/** |
| 注册 worker | worker/src/app.ts |
| 配置 env | worker env schema/example,如需要 |
| 测试 | payload schema、producer、processor、retry/failure |
5.3.8 常见错误
| 错误 | 后果 |
|---|---|
| producer 和 consumer 各自定义 payload | rolling deploy 或旧 job 解析失败。 |
| job 里塞完整大 payload | Redis 内存压力大,重放和调试困难。 |
| processor 直接写跨进程字段语义 | shared contract 失去唯一来源。 |
| 忘记注册 worker | job 会堆在 Redis。 |
| 忘记 secondary queue | 高吞吐/异常租户无法隔离。 |
| 把永久用户错误 throw | BullMQ 无限 retry 或 DLQ 噪声。 |