DDD-027:事件溯源(Event Sourcing)

发布时间:2026/6/25 14:23:02
DDD-027:事件溯源(Event Sourcing) DDD-027:事件溯源(Event Sourcing)本章导读事件溯源(Event Sourcing)是一种革命性的数据持久化范式,它不再存储对象的当前状态,而是存储导致当前状态的所有事件。每一个业务操作都以事件的形式被记录,通过回放这些事件可以重建任意时刻的系统状态。本章将深入探讨事件溯源的核心原理、实现方式以及与 CQRS 的结合应用。学习目标理解事件溯源的核心思想及其与传统 CRUD 的本质区别掌握事件溯源的关键要素:事件存储、聚合重建、快照机制学会在 Java 项目中实现事件溯源模式前置知识DDD 聚合与领域事件基础CQRS 架构模式数据库事务与并发控制基础阅读时长约 60-75 分钟【原理】事件溯源核心思想与设计原理一、事件溯源的本质与定义1.1 什么是事件溯源【原理】Event Sourcing(事件溯源)由 Martin Fowler 提出,是一种将系统状态变更记录为不可变事件序列的架构模式。核心思想:不存储对象的当前状态,而是存储导致该状态的所有事件。通过回放事件流,可以重建任意时刻的状态。传统 CRUD 模式: ┌─────────────────────────────────────────────────────────┐ │ 状态存储 │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ orders 表 │ │ │ ├─────────────────────────────────────────────────┤ │ │ │ id | customer_id | status | total | updated_at │ │ │ │-----|-------------|--------|-------|------------│ │ │ │ 001 | CUST-001 | PAID | 1000 | 2025-01-15 │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ 问题:历史状态丢失,只保留最终状态 │ │ 无法回答:订单经历过哪些状态?谁修改了?为什么修改? │ └─────────────────────────────────────────────────────────┘ 事件溯源模式: ┌─────────────────────────────────────────────────────────┐ │ 事件存储 │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ events 表 │ │ │ ├─────────────────────────────────────────────────┤ │ │ │ aggregate_id | version | event_type | data │ │ │ │--------------|---------|---------------|---------│ │ │ │ ORDER-001 | 1 | OrderCreated | {...} │ │ │ │ ORDER-001 | 2 | ItemAdded | {...} │ │ │ │ ORDER-001 | 3 | OrderPaid | {...} │ │ │ │ ORDER-001 | 4 | OrderShipped | {...} │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ 优势:完整保留历史,可重建任意时刻状态 │ │ 可以回答:订单完整生命周期、每次变更的原因和时间 │ └─────────────────────────────────────────────────────────┘核心原则:原则说明事件不可变已发生的事件不能修改或删除事件有序事件按发生顺序组成事件流状态即事件流当前状态 = 初始状态 + 所有事件的应用结果追加写入新事件只能追加,不能覆盖【历史架构问题】问题 1:状态丢失,无法追溯历史// ❌ 传统 CRUD:只存储最终状态,历史信息丢失@Entity@Table(name="orders")publicclassOrderEntity{@IdprivateStringid;privateStringstatus;// 只有当前状态:CREATED/PAID/SHIPPEDprivateBigDecimaltotal;// 只有当前金额// 问题:// 1. 订单什么时候创建的?被谁创建的?// 2. 订单状态何时变更为 PAID?// 3. 订单金额是否被修改过?// 4. 为什么订单从 CREATED 变成 PAID 又变回 CREATED?// 这些问题都无法从当前状态中得知}问题 2:审计日志分散且不完整// ❌ 传统审计方案:日志分散,难以关联// 方案 1:数据库字段记录@EntitypublicclassOrderEntity{privateStringcreatedBy;privateLocalDateTimecreatedAt;privateStringupdatedBy;privateLocalDateTimeupdatedAt;// 问题:只能记录最后一次修改,之前的修改历史丢失}// 方案 2:独立的审计日志表@EntitypublicclassAuditLogEntity{privateStringtableName;privateStringrecordId;privateStringoperation;// INSERT/UPDATE/DELETEprivateStringoldValue;// JSONprivateStringnewValue;// JSONprivateStringoperator;privateLocalDateTimeoperatedAt;}// 问题:// 1. 审计逻辑与业务逻辑分离,容易遗漏// 2. oldValue/newValue 记录的是完整对象,难以看出具体哪个字段变化// 3. 业务含义不清晰,只有技术层面的 CRUD 记录// 4. 查询和重建历史状态非常复杂问题 3:复杂的业务规则难以表达// ❌ 传统状态管理:业务规则分散在各个 Service 中@ServicepublicclassOrderService{@AutowiredprivateOrderRepositoryorderRepository;publicvoidpayOrder(StringorderId,Paymentpayment){Orderorder=orderRepository.findById(orderId);// 业务规则分散:什么条件下可以支付?if(order.getStatus()!=OrderStatus.CREATED){thrownewBusinessException("只有待支付订单才能支付");}// 状态变更逻辑分散:支付后应该做什么?order.setStatus(OrderStatus.PAID);order.setPaymentId(payment.getId());order.setPaidAt(LocalDateTime.now());// 问题:// 1. 业务规则分散在各个方法中,难以统一管理// 2. 状态变更的"原因"没有被记录// 3. 复杂的状态机逻辑难以维护}}问题 4:时间旅行查询困难// ❌ 传统方案:无法查询历史状态// 需求:查询订单在某时刻的状态publicOrdergetOrderAtTime(StringorderId,LocalDateTimeatTime){// 传统方案无法实现,因为没有存储历史状态// 只能返回当前状态thrownewUnsupportedOperationException("不支持历史状态查询");}// 需求:统计某段时间内状态变更情况publicListStateChangegetStateChanges(StringorderId,LocalDateTimefrom,LocalDateTimeto){// 需要额外的审计表,且数据可能不完整// ...}【DDD 如何解决】事件溯源的核心思路:┌──────────────────────────────────────────────────────────────────┐ │ 事件溯源架构 │ │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 命令处理 │ │ │ │ │ │ │ │ Command ──▶ CommandHandler ──▶ Aggregate ──▶ Event │ │ │ │ │ │ │ │ 1. 加载聚合(从事件流重建) │ │ │ │ 2. 执行业务方法 │ │ │ │ 3. 生成事件 │ │ │ │ 4. 保存事件 │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 事件存储 │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ │ │ Event Store │ │ │ │ │ ├──────────────────────────────────────────────────────┤ │ │ │ │ │ aggregate_id | version | event_type | event_data │ │ │ │ │ │ ORDER-001 | 1 | OrderCreated | {...} │ │ │ │ │ │ ORDER-001 | 2 | ItemAdded | {...} │ │ │ │ │ │ ORDER-001 | 3 | OrderPaid | {...} │ │ │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ 特点: │ │ │ │ - 只追加,不修改 │ │ │ │ - 事件不可变 │ │ │ │ - 有序存储 │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 事件发布 │ │ │ │ │ │ │ │ Event ──▶ EventPublisher ──▶ Subscribers │ │ │ │ │ │ │ │ 1. 更新读模型(CQRS) │ │ │ │ 2. 触发下游业务 │ │ │ │ 3. 发送通知 │ │ │ └────────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────┘关键设计:设计点传统 CRUD事件溯源存储内容当前状态状态变更事件写入方式覆盖更新追加写入数据完整性只保留最终状态完整历史审计能力需要额外实现天然支持历史查询困难或不可能轻松支持并发控制乐观锁/悲观锁乐观锁(版本号)二、事件溯源的基本要素2.1 领域事件【原理】事件溯源中的事件是已发生事实的记录,具有以下特征:特征说明不可变事件代表已发生的事实,不能被修改或删除过去时态命名使用过去时:OrderCreated、PaymentProcessed自描述事件包含完整的信息,不需要额外查询有序性每个事件有版本号,保证顺序事件结构: ┌─────────────────────────────────────────────────────┐ │ DomainEvent │ ├─────────────────────────────────────────────────────┤ │ + eventId: String // 事件唯一标识 │ │ + aggregateId: String // 聚合根 ID │ │ + aggregateType: String // 聚合类型 │ │ + version: int // 版本号(乐观锁) │ │ + eventType: String // 事件类型 │ │ + eventData: JSON // 事件数据 │ │ + occurredOn: DateTime // 发生时间 │ │ + metadata: Map // 元数据(操作人等) │ └─────────────────────────────────────────────────────┘【代码示例】// ✅ 事件溯源中的领域事件设计// 事件基类publicabstractclassDomainEvent{privatefinalStringeventId;privatefinalStringaggregateId;privatefinalStringaggregateType;privatefinalintversion;privatefinalInstantoccurredOn;privatefinalMapString,Objectmetadata;protectedDomainEvent(StringaggregateId,StringaggregateType,intversion){this.eventId=UUID.randomUUID().toString();this.aggregateId=aggregateId;this.aggregateType=aggregateType;this.version=version;this.occurredOn=Instant.now();this.metadata=newHashMap();}// Getters...publicabstractStringgetEventType();publicDomainEventwithMetadata(Stringkey,Objectvalue){this.metadata.put(key,value);returnthis;}}// 订单创建事件publicclassOrderCreatedEventextendsDomainEvent{privatefinalStringcustomerId;privatefinalListOrderItemDataitems;privatefinalBigDecimaltotalAmount;privatefinalStringshippingAddress;publicOrderCreatedEvent(StringorderId,StringcustomerId,ListOrderItemDataitems,BigDecimaltotalAmount,StringshippingAddress){super(orderId,"Order",1);this.customerId=customerId;this.items=items;this.totalAmount=totalAmount;this.shippingAddress=shippingAddress;}@OverridepublicStringgetEventType(){return"OrderCreated";}// 自描述:包含完整信息,不需要额外查询publicrecordOrderItemData(StringproductId,StringproductName,intquantity,BigDecimalunitPrice){}}// 订单支付事件publicclassOrderPaidEventextendsDomainEvent{privatefinalStringpaymentId;privatefinalBigDecimalpaidAmount;privatefinalStringpaymentMethod;publicOrderPaidEvent(StringorderId,StringpaymentId,BigDecimalpaidAmount,StringpaymentMethod){super(orderId,"Order",0);// 版本在聚合中设置this.paymentId=paymentId;this.paidAmount=paidAmount;this.paymentMethod=paymentMethod;}@OverridepublicStringgetEventType(){return"OrderPaid";}}// 订单取消事件publicclassOrderCancelledEventextendsDomainEvent{privatefinalStringreason;privatefinalStringcancelledBy;publicOrderCancelledEvent(StringorderId,Stringreason,StringcancelledBy){super(orderId,"Order",0);this.reason=reason;this.cancelledBy=cancelledBy;}@OverridepublicStringgetEventType(){return"OrderCancelled";}}2.2 事件存储【原理】事件存储是事件溯源的核心组件,负责持久化事件流。事件存储表设计: ┌─────────────────────────────────────────────────────────────┐ │ events 表 │ ├─────────────────────────────────────────────────────────────┤ │ 列名 │ 类型 │ 说明 │ │───────────────────│───────────────│────────────────────────│ │ event_id │ VARCHAR(36) │ 事件唯一 ID(主键) │ │ aggregate_id │ VARCHAR(36) │ 聚合根 ID │ │ aggregate_type │ VARCHAR(50) │ 聚合类型(Order等) │ │ version │ INT │ 版本号(乐观锁) │ │ event_type │ VARCHAR(100) │ 事件类型 │ │ event_data │ JSON │ 事件数据 │ │ occurred_on │ TIMESTAMP │ 事件发生时间 │ │ metadata │ JSON │ 元数据 │ ├─────────────────────────────────────────────────────────────┤ │ 索引: │ │ - PRIMARY KEY (event_id) │ │ - UNIQUE KEY (aggregate_id, version) -- 防止版本冲突 │ │ - INDEX (aggregate_type, aggregate_id) │ │ - INDEX (occurred_on) │ └─────────────────────────────────────────────────────────────┘ 关键约束: - (aggregate_id, version) 必须唯一:保证事件顺序 - version 必须连续:保证事件流完整性 - 事件只能追加:不能 UPDATE 或 DELETE【代码示例】// ✅ 事件存储实现// 事件存储接口publicinterfaceEventStore{// 保存事件(带乐观锁)voidsaveEvents(StringaggregateId,ListDomainEventevents,intexpectedVersion);// 加载事件流ListDomainEventloadEvents(StringaggregateId);// 加载事件流(从指定版本开始)ListDomainEventloadEvents(StringaggregateId,