Skip to content

8.2 从零设计一个 Mini Ingestion Infra

学习目标

完成本节后,你将能够:

  1. 用 Langfuse 的设计模式搭一个简化版高吞吐事件写入系统。
  2. 理解为什么要拆 schema、raw storage、queue、worker、writer、query builder。
  3. 把每一步映射回本 repo 的真实实现。

8.2.1 教程路线

本节不是让你真的在当前 repo 新建一个 mini 系统,而是用“可实现的伪代码 + Langfuse 源码对照”训练 infra 拆解能力。

阅读时请固定一个动作:每看到一个缩小版模块,就立刻回答它在 Langfuse 里对应哪个文件、哪个契约、哪个运行时边界。

如果只记住文件名,这节就失败了;如果能解释“为什么要有这一层”,这节才有效。

8.2.2 需求:一个最小可用的事件平台

假设你要做一个简化版平台,支持:

  • SDK 上报 event;
  • UI 查询 event 列表;
  • event 可能很大,包含 input/output/metadata;
  • 写入量高,不能让 HTTP 请求等待 OLAP 数据库;
  • worker 可以重试;
  • 查询时默认不读完整大字段;
  • 多租户隔离必须可靠。

这已经足够接近 Langfuse 的核心 infra。

8.2.3 第一步:定义事件契约

缩小版可以这样定义:

ts
const EventSchema = z.object({
  id: z.string(),
  projectId: z.string(),
  type: z.enum(["trace", "observation", "score"]),
  timestamp: z.coerce.date(),
  body: z.record(z.string(), z.unknown()),
});

Langfuse 对照:

  • ingestion event schema 在 packages/shared/src/server/ingestion/types.ts
  • domain schema 在 packages/shared/src/domain/**
  • Public API request/response schema 在 web/src/features/public-api/types/**

设计要点:

要点为什么
runtime schemaSDK payload 是外部输入,TypeScript 类型不够。
projectId / auth scope多租户系统必须让每条写入带租户边界。
typeworker 需要按实体类型分派 merge 逻辑。
timestamp排序、合并、查询窗口都依赖它。

这一步对应的是“系统边界里的语言”。如果这层没有 runtime schema,后面每层都要处理脏数据;如果这层没有 tenant scope,后面每层都可能漏隔离。

8.2.4 第二步:HTTP 入口只做轻工作

缩小版入口不要直接写 OLAP:

text
POST /events
  -> verify api key
  -> parse body with Zod
  -> group by entity id
  -> upload raw JSON to blob storage
  -> enqueue job with pointer
  -> return accepted result

Langfuse 对照:

  • web/src/pages/api/public/ingestion.ts
  • web/src/features/public-api/server/withMiddlewares.ts
  • web/src/features/public-api/server/createAuthedProjectAPIRoute.ts
  • packages/shared/src/server/ingestion/processEventBatch.ts

关键设计:

不要做应该做
HTTP 中同步 insert ClickHouse先写 raw payload,再投递 job。
queue payload 放完整 event bodyqueue 只放 S3 pointer 和 scope。
每条 event 一个 S3 文件同一 entity 的同批 event 合并为一个对象。

入口层的原则是“少做、但必须做对”:auth、schema、scope、采样、幂等线索、raw payload 指针。这些是请求侧才能可靠建立的信息。

8.2.5 第三步:queue payload 是跨进程协议

缩小版 payload:

ts
const EventJob = z.object({
  data: z.object({
    type: z.enum(["trace", "observation", "score"]),
    entityId: z.string(),
    fileKey: z.string(),
    bucketPrefix: z.string().optional(),
  }),
  authCheck: z.object({
    scope: z.object({
      projectId: z.string(),
    }),
  }),
});

Langfuse 对照:

  • packages/shared/src/server/queues.ts
  • IngestionEvent.data.bucketPrefix
  • QueueName.IngestionQueue
  • QueueJobs.IngestionJob
  • TQueueJobTypes[QueueName.IngestionQueue]

为什么 bucketPrefix optional?因为 Redis 里可能有旧版本 producer 产生的 job。新 worker 必须能消费旧 payload。这就是 rolling deploy 兼容。

你做自己的 infra 时,queue payload 要当成外部 API 对待:它虽然只在内部 Redis 里流动,但它跨进程、跨版本、跨部署窗口。

8.2.6 第四步:worker 做协调,service 做业务

缩小版 worker:

text
consume job
  -> resolve bucketPrefix
  -> read raw JSON
  -> dedupe recently processed file
  -> maybe redirect to secondary queue
  -> call EventMergeService
  -> call BatchWriter.addToQueue

Langfuse 对照:

  • worker/src/app.ts 注册消费者;
  • worker/src/queues/workerManager.ts 统一 metrics/error/stalled;
  • worker/src/queues/ingestionQueue.ts 协调 S3、Redis、secondary queue;
  • worker/src/services/IngestionService/index.ts 做实体合并;
  • worker/src/services/ClickhouseWriter/index.ts 做批量写入。

这里的分层很关键:processor 不应该变成所有业务规则的垃圾桶。它应该处理运行时协调;业务转换进入 service;写入策略进入 writer。

Langfuse 的 ingestion processor 就是这个模式:它处理 S3、Redis、secondary queue、span attributes 和错误标记;真正的 trace/observation/score merge 进入 IngestionService

8.2.7 第五步:写入器集中处理 OLAP 写入策略

缩小版 writer 至少要支持:

  • 按表维护队列;
  • batch size 触发 flush;
  • interval 触发 flush;
  • retry/backoff;
  • oversized payload 处理;
  • 失败指标。

Langfuse 对照 ClickhouseWriter

能力真实实现
单例ClickhouseWriter.getInstance()
按表队列queue[TableName.Traces]queue[TableName.EventsFull]
定时 flushsetInterval(... flushAll())
批量 insertformat: "JSONEachRow"
retryexponential-backoff
大字段保护string length split、size error truncation
成本字段保护Decimal64 clamp
失败可见rows_dropped metrics 和 dropped ids log

如果你把这些写在每个 worker processor 里,后续每个后台任务都会重复踩坑。

写入器是系统的“写入策略集中点”。这里决定的是吞吐、延迟、失败隔离和数据丢弃可见性,不是普通 DAO。

8.2.8 第六步:查询默认走轻量投影

缩小版查询不要默认读完整 input/output/metadata

text
list page query
  -> filter/order/page on lightweight table
  -> return list rows
  -> when detail opens, fetch full IO by ids

Langfuse 对照:

  • events_core:轻量查询投影;
  • events_full:完整 I/O 和 metadata;
  • EventsQueryBuilder.needsFullTable():决定读 core 还是 full;
  • buildEventsFullTableSplitQuery():先用 core filter/order/limit,再回 full 拿大字段;
  • eventsService.getEventList():列表默认 selectIOAndMetadata: false

这一步能直接降低列表页查询成本。

对初学者来说,这也是最容易忽略的一点:写入表结构不等于查询结构。高吞吐 infra 常常需要“写入完整事实 + 查询轻量投影”两套形状。

8.2.9 第七步:把 UI filter 做成共享语言

缩小版不要让前端拼 SQL,也不要让每个页面发明自己的 filter 形状。

Langfuse 的做法是:

  1. UI / Search Bar 产生 FilterState
  2. FilterState 由 Zod schema 校验;
  3. 后端 createFilterFromFilterState lower 成 ClickHouse SQL;
  4. column mapping 决定 UI column 对应哪个物理字段。

Langfuse 对照:

  • packages/shared/src/interfaces/filters.ts
  • packages/shared/src/server/queries/clickhouse-sql/factory.ts
  • web/src/features/search-bar/README.md
  • web/src/features/events/config/filter-config.ts

这层是“查询语言契约”。它把 UI 列、URL 状态、后端 operator、ClickHouse 物理字段连接起来。没有这层,功能会在多个页面里复制,tenant filter 和字段映射也更容易漏。

8.2.10 最小验收清单

如果你照这个模式做自己的 infra,至少要能回答:

  1. 外部请求在哪里完成 auth 和 tenant scope?
  2. 外部 payload 是否经过 runtime schema?
  3. queue payload 是否能兼容 rolling deploy?
  4. worker 失败是 retry、DLQ、降级还是记录状态?
  5. raw payload 是否可重放?
  6. OLAP 写入是否批量化?
  7. list query 是否避免默认读取大字段?
  8. filter 是否是共享语言,而不是页面私有状态?

能回答这些问题,才说明你学到的是 infra 设计,而不是复制了 Langfuse 的目录。

8.2.11 反向对照:为什么 Langfuse 更复杂

mini infra 只保留一条 event 链路;Langfuse 真实 repo 还要处理更多维度:

复杂度Langfuse 为什么需要
多实体类型trace、observation、score、dataset_run_item 的 merge 规则不同。
legacy + v4 双路径迁移期间要同时支持 legacy ClickHouse 表和 v4 events 表。
OTel 摄取OTel span 的输入语义和普通 SDK event 不完全一样。
secondary queue高吞吐项目或 S3 SlowDown 场景需要隔离,避免拖慢主队列。
Search Bar 语法分析型 UI 需要 URL 可分享、可校验、可 lower 的筛选语言。
Public API contract外部用户依赖稳定字段,所以 Fern/generated client 要同步。
多租户权限UI session、API key、worker job 都必须携带 project scope。

这也是学习这个 repo 的价值:它不是一个玩具 CRUD,而是一个真实的多运行时、多存储、多租户、高吞吐 observability infra。

下一篇

源码索引