Skip to content

4.5 Worker 队列

学习目标

完成本节后,你将能够:

  1. 理解 worker/src/app.ts 如何把 queue contract、processor 和 BullMQ options 组装起来。
  2. 读懂 WorkerManager.register 的运行时职责。
  3. 判断一个后台任务应该如何定义 payload、producer、consumer、失败语义和验证。

4.5.1 先给结论

Worker 队列链路不是“写一个 processor”就结束。完整工作模型是:

text
shared queue contract -> producer add job -> Redis/BullMQ -> worker app registers consumer -> WorkerManager wraps metrics/error -> processor -> service/side effect

在这个模型里:

  • packages/shared/src/server/queues.ts 定义 job 的名字和 payload;
  • packages/shared/src/server/redis/** 提供 typed queue class 和 sharding;
  • producer 调用 queue .add(...)
  • worker/src/app.ts 根据 env flag 注册 processor;
  • WorkerManager 创建 BullMQ Worker,包装 metrics、OpenTelemetry、error/stalled hooks;
  • processor 只做 orchestration,复杂业务应下沉到 service。

4.5.2 总链路图

4.5.3 queues.ts:队列协议源头

源码:packages/shared/src/server/queues.ts

它分三层:

作用
payload schema用 Zod 描述 job data,例如 IngestionEventOtelIngestionEventDatasetQueueEventSchema
QueueName / QueueJobs固定 Redis/BullMQ 队列名和 job 名。
TQueueJobTypes从 queue name 映射到 { timestamp, id, payload, name, retryBaggage }

为什么集中定义?因为 producer 和 consumer 不在同一个运行时:

如果 producer 和 consumer 各自定义 payload,滚动部署时就容易出现新旧版本漂移。

4.5.4 shared Redis queue class:队列实例和 sharding

源码:packages/shared/src/server/redis/**

队列 class 负责创建 typed BullMQ queue,例如 ingestion:

  • IngestionQueue.getShardNames() 生成 shard queue names;
  • IngestionQueue.getInstance({ shardingKey }) 根据 sharding key 选择 shard;
  • SecondaryIngestionQueue 提供隔离队列;
  • OTel、eval、LLM-as-judge、code eval 等队列也有类似模式。

这层的意义是:producer 不直接 new Queue(...),而是通过 shared queue class 使用统一 Redis options、queue name 和 sharding 规则。

4.5.5 worker/src/app.ts:运行时装配

worker/src/app.ts 的核心不是业务处理,而是装配:

装配项例子
env gateQUEUE_CONSUMER_INGESTION_QUEUE_IS_ENABLEDQUEUE_CONSUMER_EVAL_EXECUTION_QUEUE_IS_ENABLED
queue shardIngestionQueue.getShardNames()EvalExecutionQueue.getShardNames()
processor builderingestionQueueProcessorBuilder(true)evalJobExecutorQueueProcessorBuilder(...)
BullMQ optionsconcurrencylimiterlockDurationstalledIntervalmaxStalledCount
scheduled queue instantiationexport、metering、retention 等定时任务会先 getInstance()
long-running cleanersproject cleaner、retention cleaner、deleted mask cleaner、queue metrics runner、monitor runner。

所以代码存在不代表当前部署一定在消费。是否运行要看 env flag。

4.5.6 WorkerManager.register 做了什么

源码:worker/src/queues/workerManager.ts

WorkerManager.register(queueName, processor, options) 做几件事:

  1. 防止重复注册同名 worker;
  2. createBullMQWorkerOptionsWithRedis 获取 Redis-backed worker options;
  3. 创建 BullMQ Worker
  4. 包装 processor,记录 wait time、processing time、queue depth、failed count、active count;
  5. 从 job payload 里提取 projectId,放入 OpenTelemetry / ClickHouse context;
  6. 监听 failederrorstalled 事件,记录日志、trace exception 和 metrics;
  7. 暴露 closeWorkersgetWorkergetRegisteredQueueNames

这说明 WorkerManager 是队列运行时的统一控制面,不是普通 factory。

4.5.7 concurrency、limiter、lock 的含义

配置作用何时重要
concurrency单个 worker 同时处理多少 job控制吞吐。
limiter.max/duration全局速率限制防止外部 API、ClickHouse delete、heavy job 被打爆。
lockDurationjob lock 多久没续约算可能 stalled长任务、CPU 抖动、外部调用慢时重要。
stalledInterval多久检查 stalled job降低频繁检查带来的噪声。
maxStalledCountstalled 后最多重试次数防止锁过期导致无限重复。

例如 eval execution、PostHog/Mixpanel integration、trace delete 等长任务会显式设置 lockDurationstalledIntervalmaxStalledCount

4.5.8 失败语义

Worker 里的错误不能一概 throw,也不能一概吞掉。

情况倾向处理
临时网络错误、S3 SlowDown、外部服务短暂不可用throw,让 BullMQ retry;必要时标记 secondary queue。
用户配置错误、API key 无效、目标资源不存在记录业务失败状态,job 可结束。
永久非法 payload不要无限 retry,记录错误并进入可观察状态。
大规模删除、导出、retention使用 limiter 和低 concurrency,保护 DB/ClickHouse。
job lock 可能过期调整 lockDurationstalledIntervalmaxStalledCount

以 ingestion 为例,S3 SlowDown 会 markProjectS3Slowdown 并让项目进入 secondary queue 路径;普通处理错误会 markProjectIngestFailure 后 throw。

4.5.9 新增 queue 的标准路线

  1. packages/shared/src/server/queues.ts 增加 payload schema、QueueNameQueueJobsTQueueJobTypes
  2. 如需 producer 统一创建 queue,在 packages/shared/src/server/redis/** 增加 queue class。
  3. 在 producer 处调用 .add(QueueJobs.X, { ... }),job id 和 sharding key 要稳定。
  4. worker/src/queues/** 增加 processor,复杂逻辑放 service。
  5. worker/src/app.ts 按 env flag 注册,设置 concurrency/limiter/locks。
  6. 为 rolling deploy 考虑 optional 字段和 fallback。
  7. 写测试覆盖 schema、producer、processor 成功/失败路径。

下一节

v4 Events 查询