Skip to content

5.3 Queue contracts

学习目标

完成本节后,你将能够:

  1. 读懂 QueueNameQueueJobs、payload schema、TQueueJobTypes 的关系。
  2. 明白 queue contract 如何把 webshared producer 和 worker consumer 连接起来。
  3. 新增 worker job 时考虑 rolling deploy、sharding、secondary queue 和失败语义。

5.3.1 先给结论

队列是跨进程协议,不是函数调用。

在 Langfuse 里,一条 job 的完整 contract 包括:

text
QueueName -> QueueJobs -> payload Zod schema -> TQueueJobTypes -> queue class -> worker registration -> processor

如果只改 processor,不改 shared contract,producer 不知道怎么发;如果只改 producer,不改 worker 注册,job 没人消费;如果不考虑旧 job,新版本部署时会处理不了队列里的老 payload。

5.3.2 queues.ts 的三层结构

源码:packages/shared/src/server/queues.ts

代码形态例子
payload schemaz.object(...)z.discriminatedUnion(...)IngestionEventDatasetQueueEventSchemaBatchActionProcessingEventSchema
queue/job namesenum QueueNameenum QueueJobsQueueName.IngestionQueueQueueJobs.IngestionJob
type mapTQueueJobTypes[QueueName.IngestionQueue]: { timestamp, id, payload, name }

这个 type map 让 queue class 和 worker processor 可以拿到同一套 typed payload。

5.3.3 以 ingestion queue 为例

Producer

packages/shared/src/server/ingestion/processEventBatch.ts 会:

  1. 校验 batch;
  2. 按 entity 分组;
  3. 写 raw JSON 到 S3/blob;
  4. 构造 payload;
  5. 调用 IngestionQueue.getInstance({ shardingKey })?.add(...)

Contract

IngestionEvent schema 包含:

字段consumer 如何使用
data.type映射 ClickHouse entity type。
data.eventBodyIdsharding、dedupe、legacy S3 prefix fallback。
data.fileKey直接下载具体 S3 文件。
data.skipS3ListOTel observation 等路径可跳过 S3 list。
data.forwardToEventsTable控制 v4 events 写入。
data.bucketPrefixproducer 写 S3 时的绝对 prefix,避免 consumer 重构路径漂移。
authCheck.scope.projectIdtenant scope。

Consumer

worker/src/queues/ingestionQueue.ts 会:

  1. 从 job payload 设置 span attributes;
  2. 优先使用 bucketPrefix,旧 job fallback 到 rawEventBucketPrefix
  3. 用 Redis seen cache 跳过短时间重复处理;
  4. 根据 env/S3 SlowDown 标记转发 secondary queue;
  5. 从 S3 list 或直接下载 raw event;
  6. 恢复 canonical entity id;
  7. 调用 IngestionService.mergeAndWrite
  8. 出错时记录 ingest failure 并 throw 触发 retry。

这说明 queue payload 是 producer 和 consumer 的共同协议。

5.3.4 queue class 和 sharding

源码:packages/shared/src/server/redis/ingestionQueue.ts

队列 class 负责:

  • 生成 shard queue names;
  • 根据 sharding key 选择 shard;
  • 创建 typed BullMQ Queue;
  • 维护 singleton queue instance;
  • 监听 queue error。

sharding 的目的不是类型好看,而是让高吞吐队列可以横向分摊负载。

5.3.5 worker registration

consumer 不是自动出现的。worker/src/app.ts 会按 env flag 注册:

text
IngestionQueue.getShardNames()
  -> WorkerManager.register(shardName, ingestionQueueProcessorBuilder(true), options)

secondary queue 也要单独注册。也就是说:

  • queue name 在 shared;
  • queue instance 在 shared redis class;
  • processor 在 worker;
  • 是否消费由 worker app/env 决定。

5.3.6 rolling deploy 兼容

队列里可能存在旧版本 job。新增字段时要考虑四种组合:

producerconsumer风险
old producernew consumer新 consumer 不能假设新字段存在。
new producerold consumer新字段不能成为 old consumer 的必需字段。
old job in Redisnew consumerpayload schema 要能 parse 旧 job。
secondary queue forwarded jobconsumer转发时要保留原 payload。

常见策略:

  • 新字段先 optional;
  • consumer 写 fallback;
  • 注释说明兼容窗口;
  • 队列耗尽后再收紧;
  • schema test 覆盖新旧 payload。

bucketPrefix 就是这类字段:新 producer 写入,new consumer 优先使用;旧 job 没有它时,consumer fallback 到旧 prefix 构造逻辑。

5.3.7 新增 queue 的完整 checklist

步骤文件
定义 payload schemapackages/shared/src/server/queues.ts
增加 queue name/job nameQueueName / QueueJobs
更新 type mapTQueueJobTypes
创建 queue classpackages/shared/src/server/redis/**
导出 queue classpackages/shared/src/server/index.ts
新增 producerweb/shared/worker 里调用 .add(...)
新增 processorworker/src/queues/**
注册 workerworker/src/app.ts
配置 envworker env schema/example,如需要
测试payload schema、producer、processor、retry/failure

5.3.8 常见错误

错误后果
producer 和 consumer 各自定义 payloadrolling deploy 或旧 job 解析失败。
job 里塞完整大 payloadRedis 内存压力大,重放和调试困难。
processor 直接写跨进程字段语义shared contract 失去唯一来源。
忘记注册 workerjob 会堆在 Redis。
忘记 secondary queue高吞吐/异常租户无法隔离。
把永久用户错误 throwBullMQ 无限 retry 或 DLQ 噪声。

下一节

FilterState