07 告警状态机驱动的聚合
之前讲过 ProductModel 这种结构保证了聚合内部多个字段的业务一致性。本篇讲另一类聚合状态机驱动的聚合,以 alarm 上下文里的 Alarm 为代表。
目录
Alarm 状态机:三态、四种转移
防重不是去重,是领域不变量
乐观并发:用前置状态当版本号
创建路径和状态变更路径的区别
Alarm 状态机:三态、四种转移
AlarmStatus 是一个简单的三态枚举:
1
2
3
4
5
public enum AlarmStatus {
OPEN, // 已创建、未确认
ACKED, // 已确认、未关闭
CLOSED // 已关闭
}
合法的状态转移图:
1
2
3
4
5
6
7
[规则命中]
↓
OPEN ──acknowledge──▶ ACKED
│ │
└──close──┐ │
↓ │
CLOSED ◀────close
四条合法转移:
- 规则命中 →
OPEN(创建) OPEN→ACKED(确认)OPEN→CLOSED(直接关闭,不必确认)ACKED→CLOSED(确认后关闭)
所有非法转移都被领域代码拒绝,不能 ACKED → OPEN 反悔,不能 CLOSED → OPEN 复活,不能重复 acknowledge。
状态转移由 AlarmStatusPolicy 集中管理
Alarm 聚合的状态切换方法都返回新对象(record 不可变):
1
2
3
4
5
6
7
8
9
10
11
public Alarm acknowledge(Instant acknowledgedAt) {
Objects.requireNonNull(acknowledgedAt, "acknowledgedAt must not be null");
AlarmStatusPolicy.ensureAcknowledgeAllowed(status); // ← 守护
return new Alarm(/* 状态切换为 ACKED */);
}
public Alarm close(Instant closedAt) {
Objects.requireNonNull(closedAt, "closedAt must not be null");
AlarmStatusPolicy.ensureCloseAllowed(status); // ← 守护
return new Alarm(/* 状态切换为 CLOSED */);
}
字段一致性由策略管理
光有状态机还不够,状态切换会带出”时间字段一致性”问题:
OPEN的告警不能有acknowledgedAt或closedAtACKED的告警必须有acknowledgedAt,但不能有closedAtCLOSED的告警必须有closedAt- 各时间戳都不能早于
triggeredAt
这些约束在 AlarmStatusPolicy.validateLifecycle(...) 里集中体现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void validateLifecycle(
AlarmStatus status,
Instant triggeredAt,
Instant acknowledgedAt,
Instant closedAt) {
if (acknowledgedAt != null && acknowledgedAt.isBefore(triggeredAt)) {
throw new BaseDomainException("acknowledgedAt must not be before triggeredAt");
}
if (closedAt != null && closedAt.isBefore(triggeredAt)) {
throw new BaseDomainException("closedAt must not be before triggeredAt");
}
if (status == AlarmStatus.OPEN && (acknowledgedAt != null || closedAt != null)) {
throw new BaseDomainException("Open alarm must not have acknowledgedAt or closedAt");
}
if (status == AlarmStatus.ACKED) {
if (acknowledgedAt == null) {
throw new BaseDomainException("Acked alarm must have acknowledgedAt");
}
if (closedAt != null) {
throw new BaseDomainException("Acked alarm must not have closedAt");
}
}
if (status == AlarmStatus.CLOSED && closedAt == null) {
throw new BaseDomainException("Closed alarm must have closedAt");
}
}
防重不是去重,是领域不变量
对于像“同一规则会被反复触发,每秒一次的遥测,规则每秒命中一次,业务上这应该只产生一条告警”这种问题,有些做法是在消费者端做去重:用 Redis SETNX、用本地 Map、用 Kafka 消息 ID。本项目的解法是把”重复”建模成一个领域概念:
1
2
3
4
5
public final class AlarmDedupKeyPolicy {
public static AlarmDedupKey build(String ruleCode, String deviceId, Long telemetryEventId) {
return new AlarmDedupKey(ruleCode + ":" + deviceId + ":" + telemetryEventId);
}
}
AlarmDedupKey 是一个值对象,由三个领域字段拼成:规则编码 + 设备 ID + 遥测事件 ID。它表达的是一个明确的业务断言——
同一条遥测事件触发的同一条规则,业务上只能产生一个告警。
这条断言在数据库层做了硬约束:
1
2
3
INSERT INTO alarm_event (...)
VALUES (...)
ON CONFLICT (dedup_key) DO NOTHING
dedup_key 字段上有唯一约束。重复触发时,第二次 INSERT 直接被数据库拒绝,应用层拿到 insertedRows = 0 就知道”这条已经存在了”。
为什么 telemetryEventId 是关键
telemetryEventId 是 telemetry 上下文产出的ID,同一条遥测无论被重投几次,事件 id 都不变(参见第 1 篇里讲过的 SHA256 算法)。把它放进防重键,让 alarm 上下文获得了一个非常好的性质:
同一条遥测 → 同一个 telemetryEventId → 同一个 dedupKey → 至多一条告警
这条链条让 alarm 上下文不依赖任何中间件层去重,如Kafka at-least-once、消费者重试、补偿任务重放,全部都被 dedupKey 这一道防线搞定。
AlarmSaveResult 让”创建”和”已存在”可区分
仓储层返回的不只是 Alarm,还包括”这次是创建还是命中已有”:
1
public record AlarmSaveResult(Alarm alarm, boolean created) {}
应用服务用这个返回值决定要不要发”创建事件”:
1
2
3
4
5
6
7
8
9
@Transactional
public AlarmSaveResult createIfAbsent(CreateAlarmFromRuleCommand command) {
Alarm alarm = Alarm.openFromRule(...);
AlarmSaveResult saveResult = alarmRepository.saveIfAbsent(alarm);
if (saveResult.created()) {
publishCreated(saveResult.alarm()); // ← 只有真创建了才发事件
}
return saveResult;
}
这样就实现了重复触发不会产生重复事件的幂等。如果不区分”创建”和”已存在”,重复触发会让下游(AI、巡检、审计)每次都收到一条 AlarmCreatedEvent,下游又要自己去重,问题就反复横跳。
乐观并发:用前置状态当版本号
如果两个人同时操作同一条告警怎么办?
举个真实场景:运维 A 点确认告警,运维 B 同时点关闭告警。两个请求几乎同时到达后端,先读各自看到的都是 OPEN,各自做完合法性判断后写库。如果不做并发控制,最后状态可能是:
- A 后写:状态
ACKED,但 closedAt 已经被 B 写入 - B 后写:状态
CLOSED,但 acknowledgedAt 没有被记录
两种结果都让告警的状态/时间字段陷入不一致。这正是状态机聚合最怕的事——外部世界看到一个不可能的状态。
教科书的做法:乐观锁版本号
教科书会让你加一个 @Version 字段。每次 update 时 SQL 带 WHERE id = ? AND version = ?,匹配不上就抛 OptimisticLockException。这种做法没问题,但它有一个领域语义上的妥协——版本号是给代码看的,业务方看不见它。运维 B 看到的是”状态是 OPEN”,他不会理解”version=3”是什么。
本项目的做法:用前置状态当版本号
AlarmEventMapper 里的 update SQL 是这样的:
1
2
3
4
5
6
7
8
9
10
11
@Update("""
UPDATE alarm_event
SET status = #{record.status},
acknowledged_at = #{record.acknowledgedAt},
closed_at = #{record.closedAt}
WHERE id = #{record.id}
AND status = #{expectedStatus}
""")
int updateStatusIfCurrentStatusMatches(
@Param("record") AlarmRecord record,
@Param("expectedStatus") String expectedStatus);
注意 WHERE id = ? AND status = ?——多了一个 status = expectedStatus。应用服务调用时把”我以为的当前状态”传进去:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public Alarm acknowledge(AcknowledgeAlarmCommand command) {
Alarm currentAlarm = alarmRepository.load(command.alarmId());
Alarm acknowledgedAlarm = currentAlarm.acknowledge(command.acknowledgedAt());
AlarmStatusUpdateResult updateResult = alarmRepository.updateStatusIfCurrentStatusMatches(
acknowledgedAlarm,
currentAlarm.status()); // ← 把"我以为的当前状态"传进去
Alarm savedAlarm = updateResult.alarm();
if (updateResult.changed() && savedAlarm.status() == AlarmStatus.ACKED) {
publishAcknowledged(savedAlarm);
}
return savedAlarm;
}
回到那个并发场景。运维 A 和 B 都先读到 OPEN:
- A 写:
UPDATE ... WHERE id = X AND status = 'OPEN',update 成功,状态变ACKED - B 写:
UPDATE ... WHERE id = X AND status = 'OPEN',update 影响 0 行(因为状态已经是ACKED,不再是OPEN)
B 的请求安静地什么都没做——AlarmStatusUpdateResult.changed() 返回 false,应用层不会发”已关闭”事件,也不会留下假的状态字段。
这种做法比传统乐观锁多了一层领域语义:
- 传统乐观锁拦的是”版本号过期”,错误信息是
OptimisticLockException,业务方看不懂 - 前置状态拦的是”业务前置条件不成立”,错误信息可以是”告警已被他人确认”,业务方看得懂
其实状态本身就是版本号,告警的状态机是单向递进的(OPEN → ACKED → CLOSED,不可逆),状态变了等于发生了一次有效更新;状态没变等于没发生更新。这个性质让”前置状态”天然适合做乐观并发控制,不需要额外加 version 字段。
创建路径和状态变更路径的区别
“创建告警”和”修改告警状态”是两条不同的写入路径,分别用不同的并发策略。
创建路径:createIfAbsent
由 RuleTriggeredAlarmHandler 订阅 RuleTriggeredEvent 触发:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@EventListener
public void onRuleTriggered(RuleTriggeredEvent event) {
AlarmSaveResult saveResult = alarmApplicationService.createIfAbsent(
new CreateAlarmFromRuleCommand(
event.ruleCode(),
event.telemetryEventId(),
event.deviceId(),
event.metricName().value(),
event.metricValue(),
event.threshold(),
event.triggeredAt()));
if (saveResult.created()) {
// ... 记 metric、打 log
return;
}
// ... 重复触发,记 deduplicated metric
}
创建路径的并发控制是AlarmDedupKey 唯一约束:
- 高并发下同一规则被几十个消费者实例同时触发,数据库的唯一约束保证只有一行写入成功;
- 第二个请求拿到
insertedRows = 0,应用层返回created = false; - 重复触发不会引发任何下游事件。
注意创建路径没有用前置状态当版本号,因为创建时没有”前置状态”可言,dedup key 已经够了。
状态变更路径:acknowledge / close
由 HTTP API 触发,对应人工操作。这条路径用前置状态做乐观并发:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public Alarm acknowledge(AcknowledgeAlarmCommand command) {
Alarm currentAlarm = alarmRepository.load(command.alarmId());
Alarm acknowledgedAlarm = currentAlarm.acknowledge(command.acknowledgedAt());
AlarmStatusUpdateResult updateResult = alarmRepository.updateStatusIfCurrentStatusMatches(
acknowledgedAlarm,
currentAlarm.status());
Alarm savedAlarm = updateResult.alarm();
if (updateResult.changed() && savedAlarm.status() == AlarmStatus.ACKED) {
publishAcknowledged(savedAlarm);
}
return savedAlarm;
}
注意 updateResult.changed() 这个只有真的发生状态变化时才发事件。这点和创建路径里的 saveResult.created() 是同一个思路:事件发布的前提是”真的有变化”,而不是”调用了变更方法”。
两条路径都满足”事件发布幂等”
把两条路径合在一起看,整个 alarm 上下文对外发出的领域事件是幂等的:
- 同一规则反复触发 →
dedupKey拦住 → 只发一次AlarmCreatedEvent; - 同一告警反复确认 →
前置状态拦住 → 只发一次AlarmAcknowledgedEvent; - 同一告警反复关闭 →
前置状态拦住 → 只发一次AlarmClosedEvent。
下游(AI 摘要、巡检建单、审计、推送)订阅这些事件时不需要自己再去重,上游已经在领域层去重了。