文章

07 告警状态机驱动的聚合

07 告警状态机驱动的聚合

之前讲过 ProductModel 这种结构保证了聚合内部多个字段的业务一致性。本篇讲另一类聚合状态机驱动的聚合,以 alarm 上下文里的 Alarm 为代表。

项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

目录

Alarm 状态机:三态、四种转移

防重不是去重,是领域不变量

乐观并发:用前置状态当版本号

创建路径和状态变更路径的区别

Alarm 状态机:三态、四种转移

AlarmStatus 是一个简单的三态枚举:

1
2
3
4
5
public enum AlarmStatus {
    OPEN,    // 已创建、未确认
    ACKED,   // 已确认、未关闭
    CLOSED   // 已关闭
}

合法的状态转移图:

1
2
3
4
5
6
7
[规则命中]
    ↓
   OPEN ──acknowledge──▶ ACKED
    │                      │
    └──close──┐            │
              ↓            │
           CLOSED ◀────close

四条合法转移:

  • 规则命中 → OPEN(创建)
  • OPENACKED(确认)
  • OPENCLOSED(直接关闭,不必确认)
  • ACKEDCLOSED(确认后关闭)

所有非法转移都被领域代码拒绝,不能 ACKED → OPEN 反悔,不能 CLOSED → OPEN 复活,不能重复 acknowledge

状态转移由 AlarmStatusPolicy 集中管理

Alarm 聚合的状态切换方法都返回新对象(record 不可变):

1
2
3
4
5
6
7
8
9
10
11
public Alarm acknowledge(Instant acknowledgedAt) {
    Objects.requireNonNull(acknowledgedAt, "acknowledgedAt must not be null");
    AlarmStatusPolicy.ensureAcknowledgeAllowed(status);   // ← 守护
    return new Alarm(/* 状态切换为 ACKED */);
}

public Alarm close(Instant closedAt) {
    Objects.requireNonNull(closedAt, "closedAt must not be null");
    AlarmStatusPolicy.ensureCloseAllowed(status);          // ← 守护
    return new Alarm(/* 状态切换为 CLOSED */);
}

字段一致性由策略管理

光有状态机还不够,状态切换会带出”时间字段一致性”问题:

  • OPEN 的告警不能有 acknowledgedAtclosedAt
  • ACKED 的告警必须有 acknowledgedAt,但不能有 closedAt
  • CLOSED 的告警必须有 closedAt
  • 各时间戳都不能早于 triggeredAt

这些约束在 AlarmStatusPolicy.validateLifecycle(...) 里集中体现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void validateLifecycle(
        AlarmStatus status,
        Instant triggeredAt,
        Instant acknowledgedAt,
        Instant closedAt) {
    if (acknowledgedAt != null && acknowledgedAt.isBefore(triggeredAt)) {
        throw new BaseDomainException("acknowledgedAt must not be before triggeredAt");
    }
    if (closedAt != null && closedAt.isBefore(triggeredAt)) {
        throw new BaseDomainException("closedAt must not be before triggeredAt");
    }
    if (status == AlarmStatus.OPEN && (acknowledgedAt != null || closedAt != null)) {
        throw new BaseDomainException("Open alarm must not have acknowledgedAt or closedAt");
    }
    if (status == AlarmStatus.ACKED) {
        if (acknowledgedAt == null) {
            throw new BaseDomainException("Acked alarm must have acknowledgedAt");
        }
        if (closedAt != null) {
            throw new BaseDomainException("Acked alarm must not have closedAt");
        }
    }
    if (status == AlarmStatus.CLOSED && closedAt == null) {
        throw new BaseDomainException("Closed alarm must have closedAt");
    }
}

防重不是去重,是领域不变量

对于像“同一规则会被反复触发,每秒一次的遥测,规则每秒命中一次,业务上这应该只产生一条告警”这种问题,有些做法是在消费者端做去重:用 Redis SETNX、用本地 Map、用 Kafka 消息 ID。本项目的解法是把”重复”建模成一个领域概念:

1
2
3
4
5
public final class AlarmDedupKeyPolicy {
    public static AlarmDedupKey build(String ruleCode, String deviceId, Long telemetryEventId) {
        return new AlarmDedupKey(ruleCode + ":" + deviceId + ":" + telemetryEventId);
    }
}

AlarmDedupKey 是一个值对象,由三个领域字段拼成:规则编码 + 设备 ID + 遥测事件 ID。它表达的是一个明确的业务断言——

同一条遥测事件触发的同一条规则,业务上只能产生一个告警。

这条断言在数据库层做了硬约束:

1
2
3
INSERT INTO alarm_event (...)
VALUES (...)
ON CONFLICT (dedup_key) DO NOTHING

dedup_key 字段上有唯一约束。重复触发时,第二次 INSERT 直接被数据库拒绝,应用层拿到 insertedRows = 0 就知道”这条已经存在了”。

为什么 telemetryEventId 是关键

telemetryEventId 是 telemetry 上下文产出的ID,同一条遥测无论被重投几次,事件 id 都不变(参见第 1 篇里讲过的 SHA256 算法)。把它放进防重键,让 alarm 上下文获得了一个非常好的性质:

同一条遥测 → 同一个 telemetryEventId → 同一个 dedupKey → 至多一条告警

这条链条让 alarm 上下文不依赖任何中间件层去重,如Kafka at-least-once、消费者重试、补偿任务重放,全部都被 dedupKey 这一道防线搞定。

AlarmSaveResult 让”创建”和”已存在”可区分

仓储层返回的不只是 Alarm,还包括”这次是创建还是命中已有”:

1
public record AlarmSaveResult(Alarm alarm, boolean created) {}

应用服务用这个返回值决定要不要发”创建事件”:

1
2
3
4
5
6
7
8
9
@Transactional
public AlarmSaveResult createIfAbsent(CreateAlarmFromRuleCommand command) {
    Alarm alarm = Alarm.openFromRule(...);
    AlarmSaveResult saveResult = alarmRepository.saveIfAbsent(alarm);
    if (saveResult.created()) {
        publishCreated(saveResult.alarm());   // ← 只有真创建了才发事件
    }
    return saveResult;
}

这样就实现了重复触发不会产生重复事件的幂等。如果不区分”创建”和”已存在”,重复触发会让下游(AI、巡检、审计)每次都收到一条 AlarmCreatedEvent,下游又要自己去重,问题就反复横跳。

乐观并发:用前置状态当版本号

如果两个人同时操作同一条告警怎么办

举个真实场景:运维 A 点确认告警,运维 B 同时点关闭告警。两个请求几乎同时到达后端,先读各自看到的都是 OPEN,各自做完合法性判断后写库。如果不做并发控制,最后状态可能是:

  • A 后写:状态 ACKED,但 closedAt 已经被 B 写入
  • B 后写:状态 CLOSED,但 acknowledgedAt 没有被记录

两种结果都让告警的状态/时间字段陷入不一致。这正是状态机聚合最怕的事——外部世界看到一个不可能的状态

教科书的做法:乐观锁版本号

教科书会让你加一个 @Version 字段。每次 update 时 SQL 带 WHERE id = ? AND version = ?,匹配不上就抛 OptimisticLockException。这种做法没问题,但它有一个领域语义上的妥协——版本号是给代码看的,业务方看不见它。运维 B 看到的是”状态是 OPEN”,他不会理解”version=3”是什么。

本项目的做法:用前置状态当版本号

AlarmEventMapper 里的 update SQL 是这样的:

1
2
3
4
5
6
7
8
9
10
11
@Update("""
    UPDATE alarm_event
    SET status = #{record.status},
        acknowledged_at = #{record.acknowledgedAt},
        closed_at = #{record.closedAt}
    WHERE id = #{record.id}
      AND status = #{expectedStatus}
""")
int updateStatusIfCurrentStatusMatches(
        @Param("record") AlarmRecord record,
        @Param("expectedStatus") String expectedStatus);

注意 WHERE id = ? AND status = ?——多了一个 status = expectedStatus。应用服务调用时把”我以为的当前状态”传进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public Alarm acknowledge(AcknowledgeAlarmCommand command) {
    Alarm currentAlarm = alarmRepository.load(command.alarmId());
    Alarm acknowledgedAlarm = currentAlarm.acknowledge(command.acknowledgedAt());
    AlarmStatusUpdateResult updateResult = alarmRepository.updateStatusIfCurrentStatusMatches(
            acknowledgedAlarm,
            currentAlarm.status());      // ← 把"我以为的当前状态"传进去
    Alarm savedAlarm = updateResult.alarm();
    if (updateResult.changed() && savedAlarm.status() == AlarmStatus.ACKED) {
        publishAcknowledged(savedAlarm);
    }
    return savedAlarm;
}

回到那个并发场景。运维 A 和 B 都先读到 OPEN

  • A 写:UPDATE ... WHERE id = X AND status = 'OPEN',update 成功,状态变 ACKED
  • B 写:UPDATE ... WHERE id = X AND status = 'OPEN',update 影响 0 行(因为状态已经是 ACKED,不再是 OPEN

B 的请求安静地什么都没做——AlarmStatusUpdateResult.changed() 返回 false,应用层不会发”已关闭”事件,也不会留下假的状态字段。

这种做法比传统乐观锁多了一层领域语义

  • 传统乐观锁拦的是”版本号过期”,错误信息是 OptimisticLockException,业务方看不懂
  • 前置状态拦的是”业务前置条件不成立”,错误信息可以是”告警已被他人确认”,业务方看得懂

其实状态本身就是版本号,告警的状态机是单向递进的(OPEN → ACKED → CLOSED,不可逆),状态变了等于发生了一次有效更新;状态没变等于没发生更新。这个性质让”前置状态”天然适合做乐观并发控制,不需要额外加 version 字段。

创建路径和状态变更路径的区别

“创建告警”和”修改告警状态”是两条不同的写入路径,分别用不同的并发策略。

创建路径:createIfAbsent

RuleTriggeredAlarmHandler 订阅 RuleTriggeredEvent 触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@EventListener
public void onRuleTriggered(RuleTriggeredEvent event) {
    AlarmSaveResult saveResult = alarmApplicationService.createIfAbsent(
            new CreateAlarmFromRuleCommand(
                    event.ruleCode(),
                    event.telemetryEventId(),
                    event.deviceId(),
                    event.metricName().value(),
                    event.metricValue(),
                    event.threshold(),
                    event.triggeredAt()));
    if (saveResult.created()) {
        // ... 记 metric、打 log
        return;
    }
    // ... 重复触发,记 deduplicated metric
}

创建路径的并发控制是AlarmDedupKey 唯一约束

  • 高并发下同一规则被几十个消费者实例同时触发,数据库的唯一约束保证只有一行写入成功;
  • 第二个请求拿到 insertedRows = 0,应用层返回 created = false
  • 重复触发不会引发任何下游事件。

注意创建路径没有用前置状态当版本号,因为创建时没有”前置状态”可言,dedup key 已经够了。

状态变更路径:acknowledge / close

由 HTTP API 触发,对应人工操作。这条路径用前置状态做乐观并发

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public Alarm acknowledge(AcknowledgeAlarmCommand command) {
    Alarm currentAlarm = alarmRepository.load(command.alarmId());
    Alarm acknowledgedAlarm = currentAlarm.acknowledge(command.acknowledgedAt());
    AlarmStatusUpdateResult updateResult = alarmRepository.updateStatusIfCurrentStatusMatches(
            acknowledgedAlarm,
            currentAlarm.status());
    Alarm savedAlarm = updateResult.alarm();
    if (updateResult.changed() && savedAlarm.status() == AlarmStatus.ACKED) {
        publishAcknowledged(savedAlarm);
    }
    return savedAlarm;
}

注意 updateResult.changed() 这个只有真的发生状态变化时才发事件。这点和创建路径里的 saveResult.created() 是同一个思路:事件发布的前提是”真的有变化”,而不是”调用了变更方法”

两条路径都满足”事件发布幂等”

把两条路径合在一起看,整个 alarm 上下文对外发出的领域事件是幂等的

  • 同一规则反复触发 → dedupKey 拦住 → 只发一次 AlarmCreatedEvent
  • 同一告警反复确认 → 前置状态 拦住 → 只发一次 AlarmAcknowledgedEvent
  • 同一告警反复关闭 → 前置状态 拦住 → 只发一次 AlarmClosedEvent

下游(AI 摘要、巡检建单、审计、推送)订阅这些事件时不需要自己再去重,上游已经在领域层去重了。


项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

本文由作者按照 CC BY 4.0 进行授权