Skip to content

4.4 Ingestion 链路

学习目标

完成本节后,你将能够:

  1. 从 SDK/OTel 上报追到 ClickHouse 写入。
  2. 理解为什么 ingestion 不同步写 ClickHouse。
  3. 说清楚 S3、Redis、worker、IngestionServiceClickhouseWriter 如何组合。

4.4.1 先给结论

Langfuse ingestion 的设计不是“收到 event -> 直接 insert”。真实链路是:

  1. 请求侧只做校验、排序、分组、采样、raw event 上传和轻量 job 投递;
  2. worker 侧做 S3 读取、去重、secondary queue 分流、实体合并、数据补全;
  3. ClickHouse 写入侧再做按表缓冲、批量 insert、重试、截断和失败计数。

这个拆分是为了高吞吐、可重试、可批量、可重放,并避免用户请求直接承担 ClickHouse 写入延迟和失败。

4.4.2 链路图

4.4.3 请求侧:processEventBatch 做什么

源码:packages/shared/src/server/ingestion/processEventBatch.ts

阶段代码意图为什么重要
记录 metrics/spanrecordIncrementrecordDistribution、span attributesingestion 是高吞吐入口,必须能观察 batch size 和 project scope。
校验 project scopeauthCheck.scope.projectId防止无租户归属的 event 进入数据面。
Zod 校验 eventcreateIngestionEventSchemaSDK payload 是外部输入,必须先变成内部可处理事件。
权限判断isAuthorizedscore-only key 只能创建 score,project key 才能写完整事件。
排序sortBatchcreate 事件先于 update 事件,降低同批次 merge 的歧义。
分组entityType + event.body.id同一 trace/observation/score 的多个事件合并成一个 S3 对象和一个 job。
路径安全safeBlobKeySegmentsafeBlobFilenameStembuildEventBucketPrefix防止用户 id 影响 S3 路径结构,也让 producer/consumer 对路径有明确契约。
raw uploaduploadJson(bucketPath, data)保存原始事件,worker 可以重放、合并、批量处理。
enqueueIngestionQueue.getInstance({ shardingKey }).add(...)job 只带指针和轻量 metadata,不塞完整 body。

这里最值得注意的是 bucketPrefix。它由 producer 计算后放入 queue payload,consumer 优先使用它,而不是用 worker 本地 env 重新拼路径。这是跨进程协议设计,不是普通字段。

4.4.4 队列契约:producer 和 consumer 怎么对齐

源码:packages/shared/src/server/queues.ts

IngestionEvent 定义了 producer 和 consumer 共同理解的 payload:

字段作用
data.type判断 trace、observation、score、dataset_run_item 等实体类型。
data.eventBodyId实体 id,用于 sharding、dedupe 和 merge。
data.fileKeyS3 文件名 stem,让 worker 找到 raw JSON。
data.skipS3ListOTel observation 或特定项目可以直接读单文件,减少 list 成本。
data.forwardToEventsTable控制是否写入 v4 events staging/full path。
data.bucketPrefixproducer 写 S3 的绝对前缀,避免 consumer 重构路径漂移。
authCheck.scope.projectIdtenant scope,worker 后续写入必须带它。

这类字段一旦变更,就不是“改一个类型”那么简单。它影响 rolling deploy、历史 job、secondary queue 和 worker 兼容。

4.4.5 worker 侧:ingestionQueueProcessorBuilder 做什么

源码:worker/src/queues/ingestionQueue.ts

阶段代码意图设计原因
span attributes把 job id、projectId、eventBodyId、type、fileKey 放入 tracing失败时能从 job 追到租户和实体。
获取 ClickhouseWriterClickhouseWriter.getInstance()worker 内共享批量写入器。
解析 S3 prefix优先 payload.data.bucketPrefix,旧 job fallback 到 raw prefix兼容 rolling deploy 和老队列。
Redis seen cachelangfuse:ingestion:recently-processed:*防止短时间重复处理同一文件。
secondary queue 分流env allowlist 或 S3 SlowDown flag高吞吐/异常项目可以隔离到 secondary queue。
S3 读取skipS3List 走单文件,否则 list prefix 后并发下载在成本和完整性之间取舍。
设置 seen keys下载成功后写 Redis 5 分钟窗口避免快速更新导致重复消费。
计算 forwardToEventsTablepayload 优先,env fallback支持迁移期开关和双写。
恢复 canonical id从 event body 恢复 raw id,而不是用 sanitized path id防止 S3 安全路径影响 ClickHouse row id。
调用 IngestionServicemergeAndWrite(...)processor 不直接做业务合并。

这里的 processor 像“运行时协调器”:它处理 job、S3、Redis、secondary queue 和错误语义,但不应该把 ClickHouse record 合并逻辑散落在 processor 里。

4.4.6 业务合并:IngestionService 才是数据转换核心

源码:worker/src/services/IngestionService/index.ts

IngestionService.mergeAndWrite 按实体类型分派:

实体方法关键行为
traceprocessTraceEventList合并 trace create/update;补 input/output;写 traces;必要时写 staging observation;触发 TraceUpsertQueue 给 eval 使用。
observationprocessObservationEventList合并 observation;补 prompt、tool calls、usage、cost;必要时创建 wrapper trace;写 observations 和 staging。
scoreprocessScoreEventList校验/补全 score;读取旧 ClickHouse record;合并后写 scores
dataset_run_itemprocessDatasetRunItemEventList从 Postgres 补 dataset run/item 信息;写 ClickHouse dataset run item 表。

它的核心模式是:

  1. 将同一实体的事件按时间排序;
  2. 读取 ClickHouse 中可能已经存在的旧 record;
  3. 用不可变字段规则保护 idproject_id、timestamp 等字段;
  4. 让新事件覆盖可变字段;
  5. 做模型、usage、cost、prompt、tool metadata 等补全;
  6. 用 Zod/record schema 校验最终 insert record;
  7. 交给 ClickhouseWriter,而不是自己 insert。

这就是 ingestion 的“业务内核”。

4.4.7 写入器:ClickhouseWriter 为什么单独存在

源码:worker/src/services/ClickhouseWriter/index.ts

ClickhouseWriter 是 worker 内的批量写入缓冲层:

机制代码行为设计原因
单例ClickhouseWriter.getInstance()多个 processor 共用同一批量写入队列。
按表队列queue[TableName.Traces]不同表有不同 record schema,批量写入要分表。
双触发 flushbatch size 到达或 interval 到达在吞吐和延迟之间折中。
JSONEachRowclickhouseClient().insert({ format: "JSONEachRow" })ClickHouse 批量写入友好。
retry/backoffsocket hang up 等可重试错误会 retry网络抖动不应直接丢数据。
超大字段保护string length / size error 时拆批或截断防止单条异常 payload 拖垮整批。
Decimal clampcost 字段超出 Decimal64 时限幅防止非法成本值导致整批失败。
drop metricsmax attempts 后记录 dropped rows失败不是静默发生。

如果把这些逻辑写在每个 processor 里,所有后台任务都会重复处理批量、重试、截断和指标。独立的 writer 是为了把 ClickHouse 写入策略集中起来。

4.4.8 为什么是 S3 + Redis + worker + ClickHouse

方案问题
HTTP 请求里直接写 ClickHouse请求延迟和 ClickHouse 抖动直接暴露给 SDK;无法批量;难重试。
Redis job 里塞完整 event bodypayload 大、成本高、队列内存压力大、重放不方便。
每个 event 单独写 S3 和 job小文件和 job 数过多,S3/list/queue 成本高。
worker 直接 insert 单条ClickHouse 写入吞吐低,失败处理散落。

当前设计的组合是:

  • S3/blob 保存原始事件正文;
  • Redis/BullMQ 只保存轻量 job 和调度状态;
  • worker 做异步合并、重试和隔离;
  • IngestionService 做业务转换;
  • ClickhouseWriter 做统一批量写入;
  • ClickHouse 承载最终分析查询。

这是一条面向高吞吐事件平台的写入路径。

4.4.9 关键不变量

  1. producer 和 consumer 必须共享同一 queue payload schema。
  2. worker 不能随意重构 producer 的 S3 路径;优先使用 payload 里的 bucketPrefix
  3. ClickHouse row id 应来自 event body 的 canonical id,而不是 sanitized S3 path。
  4. IngestionService 负责实体级合并,processor 只做运行时协调。
  5. ClickHouse 写入必须走 ClickhouseWriter,不要在 processor 里散落 insert。
  6. tenant scope 必须从 authCheck 带到每一次写入。
  7. 采样、secondary queue、skip S3 list、v4 dual-write 都是运行时策略,不是独立业务分支。

下一节

Worker 队列