Skip to content

5.2 Domain + Zod

学习目标

完成本节后,你将能够:

  1. 区分 domain schema、API schema、queue schema、DB record schema。
  2. 理解 Zod 在 Langfuse 里为什么是运行时边界,不只是 TypeScript 类型来源。
  3. 判断数据转换应该发生在哪个边界。

5.2.1 先给结论

Langfuse 的 Zod schema 不只是为了 z.infer。它承担的是跨边界数据校验:

text
外部 JSON -> API/ingestion schema -> domain schema -> DB record schema -> API/UI response

在平台型系统里,TypeScript 类型只能保护编译期。SDK payload、HTTP body、Redis job、ClickHouse row、Postgres row 都是在运行时到达的,所以需要 Zod 把“不可信输入”变成“内部可处理数据”。

5.2.2 四类 schema

类型位置例子作用
Domain schemapackages/shared/src/domain/**TraceDomainObservationSchemaScoreSchema表示业务概念的稳定形状。
API schemaweb/src/features/public-api/types/**、route configquery/body/response schema保护外部 API contract。
Queue schemapackages/shared/src/server/queues.tsIngestionEventOtelIngestionEvent保护 producer/consumer 跨进程协议。
DB record schemapackages/shared/src/server/repositories/definitions.tsobservationRecordInsertSchemascoreRecordReadSchema保护 ClickHouse row 的读写形状。

这些 schema 解决的是不同层的问题,不应该互相替代。

5.2.3 Domain schema:业务语义

源码:

  • packages/shared/src/domain/traces.ts
  • packages/shared/src/domain/observations.ts
  • packages/shared/src/domain/scores.ts

典型模式:

ts
export const TraceDomain = z.object({
  id: z.string(),
  name: z.string().nullable(),
  timestamp: z.date(),
  environment: z.string(),
  tags: z.array(z.string()),
  projectId: z.string(),
});

export type TraceDomain = z.infer<typeof TraceDomain>;

domain schema 表达的是产品概念,例如:

  • observation 有 typelevelstartTimeendTimeinputoutput、usage/cost;
  • score 有 source、dataType、trace/session/dataset/observation 关联;
  • trace 有 session、user、metadata、tags、bookmarked/public。

注意:domain schema 不等于 DB row。domain 用 camelCase 和 JS Date;ClickHouse record 用 snake_case、epoch number 或 ClickHouse date string。

5.2.4 API schema:外部承诺

Public API 的 schema 包含 query/body/response。它们的职责是:

  • 过滤掉外部请求里不合法的字段;
  • 给 SDK 和用户稳定响应;
  • 在 development 环境发现 response schema drift;
  • 和 Fern contract 对齐。

例如 createAuthedProjectAPIRoute 要求每条 route 显式传 responseSchema。这不是多余校验,而是外部 contract 的运行时保护。

5.2.5 Queue schema:跨进程协议

packages/shared/src/server/queues.ts 里的 schema 面向 Redis/BullMQ job。

IngestionEvent 为例:

text
data.type
data.eventBodyId
data.fileKey
data.skipS3List
data.forwardToEventsTable
data.bucketPrefix
authCheck.scope.projectId

producer 在 request side 或 shared service;consumer 在 worker。它们不共享内存,只共享 payload。所以 queue schema 必须能回答:

  • consumer 如何找到 S3 文件?
  • tenant scope 从哪里来?
  • 是否跳过 S3 list?
  • 是否写入 v4 events table?
  • rolling deploy 中旧 job 没有新字段时怎么办?

这就是为什么 queue 字段新增常常先 optional(),再在 consumer 写 fallback。

5.2.6 DB record schema:读写形状

源码:packages/shared/src/server/repositories/definitions.ts

ClickHouse record schema 常分成 read 和 insert:

schema形状原因
observationRecordInsertSchematimestamp 字段是 number,适合写入worker/ClickhouseWriter 插入 ClickHouse。
observationRecordReadSchematimestamp 字段是 ClickHouse string,再 transform 成 ISO datetimeClickHouse client 读取时返回字符串。
scoreRecordInsertSchema / scoreRecordReadSchemascore row 写入/读取形状不同读写边界分离。
eventsObservationRecordReadSchemaevents table 额外带 user/session/trace denormalized 字段v4 events 查询需要更多上下文。

这说明 Zod schema 也在处理物理存储差异。不要假设 ClickHouse row 和 domain object 一样。

5.2.7 数据转换边界

常见转换:

转换发生位置
外部 JSON -> ingestion eventprocessEventBatch、ingestion schema。
Public API body -> route inputcreateAuthedProjectAPIRoute
loose event list -> strict ClickHouse recordIngestionService
ClickHouse row -> domain/read modelrepositories 和 read schema。
domain/read model -> client-safe responsepublic API response schema 或 UI domain converter。

边界越清楚,越容易测试和迁移。

5.2.8 改 schema 时的风险面

改动要同步检查
新增 domain 字段API response、UI display、DB record、tests。
新增 ingestion 字段SDK payload、Zod schema、S3 raw event、worker merge、ClickHouse writer。
新增 queue 字段producer、consumer、old jobs、secondary queue、DLQ retry。
新增 ClickHouse 列migrations、insert schema、read schema、query field map。
新增 Public API response fieldresponseSchema、Fern、generated client、docs。
改 score/observation enumfilters、search-bar field registry、query builder、UI labels。

5.2.9 自检清单

  • 这个 schema 是 API、domain、queue 还是 DB record?
  • 输入是否来自不可信边界?
  • 是否需要 runtime parse,而不只是 TS type?
  • 是否跨进程或跨部署版本?
  • 是否有旧数据或旧 job 缺字段?
  • 读 schema 和写 schema 是否应该分开?
  • Public API 是否需要同步 Fern?

下一节

Queue contracts