文章

10 下行命令链路和影子期望

10 下行命令链路和影子期望

前几篇讲的全是上行链路“设备 → broker → kafka → access → telemetry → rule → alarm → ai → audit”。本篇讲下行“平台怎么把一条指令送到设备、怎么知道设备收到了、收不到怎么办”。下行链路在 IoT 平台里看起来单薄,但失败模式和复杂度比上行要高。

项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

目录

下行链路为什么比上行难

DeviceCommand 状态机:5 态 + 3 条终结路径

下发路径:写库 → MQTT 发布 → markSent

ACK 路径:异步、幂等、迟到容忍

影子期望

下行链路为什么比上行难

上行链路看起来复杂,协议、schema、规则、告警、AI、审计,但都是设备主动推送、平台被动接收处理,无交互、相对简单。

下行则反过来,平台主动推、设备被动收,这样就有三个问题出现了:

1. 设备可能不在线

设备掉线、设备断电、设备的 MQTT 连接刚被踢掉,平台不一定知道;另外,命令发出去,broker 接收了不等于设备接收了。

2. 设备的”成功”和”平台的成功”不是一回事

平台把命令发到 broker、broker 给了 PUBACK,平台以为成功了,但设备可能根本没收到。要让”平台知道设备收到了”,必须让设备主动 ACK,这条 ACK 走的还是上行链路。

3. 命令的副作用不能撤回

上行只是”读取数据”,错了重读一次就好;下行是”改变设备状态”,发一条”设备重启”命令出去,一旦发错就只能补救,不能回滚。

所以总结一下,就下面三句话:

  • 设备可能永远不会 ACK;
  • ACK 可能在任何时间到达(30 秒内、3 小时后、3 天后);
  • 命令一旦发出,”已发送”和”已生效”是两个不同的事实。

本项目的 iot-context-command 上下文就是围绕这三件事设计,下面展开细说。

DeviceCommand 状态机:5 态 + 3 条终结路径

DeviceCommand 是 command 上下文的聚合根,5 个状态:

1
2
3
4
5
6
7
public enum CommandStatus {
    CREATED,        // 已创建,未发出
    SENT,           // 已发出,等 ACK
    ACKED_SUCCESS,  // 设备回 ACK,执行成功
    ACKED_FAILED,   // 设备回 ACK,执行失败
    TIMED_OUT       // 等 ACK 超时
}

合法状态转移图:

1
2
3
4
5
6
7
[创建]
   ↓
CREATED ──markSent──▶ SENT ──markAckSuccess──▶ ACKED_SUCCESS  (终态)
                       │
                       ├──markAckFailed───────▶ ACKED_FAILED   (终态)
                       │
                       └──markTimedOut───────▶ TIMED_OUT       (终态)

这个结构跟 07 篇的 Alarm 不同,Alarm 是 3 态线性递进;DeviceCommand 是 5 态 + 3 条并列终态,对应“设备 ACK 成功、设备 ACK 失败、超时无 ACK”三种不同的结局。

下发路径:写库 → MQTT 发布 → markSent

CommandApplicationService.sendSetReportInterval 是下发命令的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Transactional
public DeviceCommandVO sendSetReportInterval(SendSetReportIntervalCommand command) {
    Instant now = command.requestedAt();
    String commandId = "cmd-" + UUID.randomUUID();
    String payloadJson = buildSetReportIntervalPayload(...);

    // 1. 创建命令并落库(CREATED 状态)
    DeviceCommand created = DeviceCommand.createSetReportInterval(commandId, command.deviceId(), payloadJson, now);
    DeviceCommandSaveResult saveResult = deviceCommandRepository.saveIfAbsent(created);
    DeviceCommand saved = saveResult.command();

    // 2. 发布 CommandCreatedEvent(让 audit 留痕)
    applicationEventPublisher.publishEvent(new CommandCreatedEvent(...));

    // 3. 通过 MQTT 发布到设备
    Instant publishStartedAt = Instant.now();
    commandMqttPublishPort.publish(saved.deviceId(), saved.payloadJson());
    commandPublishTimer.record(Duration.between(publishStartedAt, Instant.now()));

    // 4. 切换状态到 SENT
    DeviceCommandStatusUpdateResult updateResult = deviceCommandRepository.updateStatus(saved.markSent(now));
    return toVO(updateResult.command());
}

但中间还有个小问题:MQTT 发完才 markSent

注意第 3 步和第 4 步,MQTT 已经发出去了,但 markSent 在第 4 步才执行。如果第 3 步成功、第 4 步失败(比如数据库瞬时不可用),命令在数据库里是 CREATED 状态,但实际上已经发出去了,这就会产生一个bug。

当前的处理方式是:

  • 整个方法在一个 @Transactional 里,如果第 4 步失败,第 1 步也会回滚,命令记录不会留下,但 MQTT 那边其实已经发了;
  • 所以最坏情况是”命令已发但平台无记录”;

真正的解法:outbox 模式

要把这里做严谨,就前面几篇文章中提到的,生产环境应该走 outbox 模式:

  • 第 1 步在事务里写两张表:device_command(业务表)+ command_outbox(待发表);
  • 事务提交后,独立的 publisher 进程从 outbox 读出来发 MQTT;
  • 发成功后,把 outbox 行标记为已发,并 update device_command.status = SENT

这样保证”业务记录和 MQTT 发送”两件事最终一致,但代价是增加一个 publisher 进程、增加发送延迟、需要处理 publisher 自己的故障转移。

MQTT 发布也是端口

注意 commandMqttPublishPort 是一个端口接口:

1
2
3
public interface CommandMqttPublishPort {
    void publish(String deviceId, String payloadJson);
}

实现是 MqttCommandPublisher,但应用服务不知道。未来切到 Kafka 下行、HTTP 下行、CoAP 下行都可以,业务代码不动。

ACK 路径:异步、幂等、迟到容忍

设备处理完命令后会上行一条 ACK 消息。这条 ACK 在本项目里走的是和遥测同样的路径 MQTT → bridge → Kafka → backend 消费,只不过 topic 不同。

CommandAckKafkaConsumer 收到消息后调 CommandAckApplicationService.handleAck

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Transactional
public void handleAck(CommandAckPayload payload) {
    DeviceCommand current = deviceCommandRepository.loadByCommandId(payload.commandId());

    // 1. 幂等:已经 ACK 过的,直接忽略
    if (current.isAcked()) {
        log.info("command-ack-ignored commandId={} reason=already-acked", current.commandId());
        return;
    }

    // 2. 根据 ACK 内容切换状态(状态上也保证了幂等)
    DeviceCommand updated = isSuccess(payload.status())
            ? current.markAckSuccess(payload.message(), payload.ackedAt())
            : current.markAckFailed(payload.message(), payload.ackedAt());
    DeviceCommandStatusUpdateResult result = deviceCommandRepository.updateStatus(updated);

    // 3. 记 metrics
    if (isSuccess(payload.status())) {
        ackSuccessCounter.increment();
    } else {
        ackFailedCounter.increment();
    }
    if (result.command().sentAt() != null) {
        ackLatencyTimer.record(Duration.between(result.command().sentAt(), payload.ackedAt()));
    }

    // 4. 发布 CommandAckReceivedEvent
    applicationEventPublisher.publishEvent(new CommandAckReceivedEvent(...));
}

迟到 ACK 怎么处理

注意 markAckSuccess 接受任意时间点的 ackedAt,理论上可以是 5 分钟前、5 小时前、甚至 5 天前的时间。但问题来了:如果命令已经被超时扫描标记为 TIMED_OUT,迟到 ACK 进来怎么办?

ensureAckAllowed() 会拒绝,TIMED_OUT 不是 SENT。这条 ACK 会被悄悄丢弃(应用层抛异常被吞),下游什么都不会发生。

为什么这么做?原因是:

  • 一旦平台超时认定 “设备没收到”,运维可能已经做了重发或排障;
  • 这时候迟到 ACK 进来,业务上应该被忽略(因为已经基于”超时”做过决策);
  • 但要留痕,迟到 ACK 会被记到 audit 日志或日志文件,运维可以查”哪条命令在超时之后又收到了 ACK”。

ACK 消息里的”成功”和”失败”

注意 CommandAckPayload.status 是字符串:”SUCCESS” 或其他值,这是设备上行 ACK 的协议字段。命令的 ACK 不只是 “我收到了”,还包含“我执行成功了/失败了”(这里简化处理;生产环境中,可以考虑再发一条“命令执行结果”的报文)。

设备执行命令可能:

  • 收到了,参数解析失败 → ACK_FAILED
  • 收到了,硬件不支持 → ACK_FAILED
  • 收到了,执行成功 → ACK_SUCCESS

影子期望

本项目的 iot-context-command 是命令式,平台通过 HTTP API 创建一条命令,立即下发到 MQTT,等设备 ACK。

但 IoT 平台还有另一条主流路径,通过影子期望来完成命令下发执行:

维度本项目影子期望式
平台动作创建并立即下发命令修改设备影子的 desired 字段
设备动作收到命令立即响应上线后主动 pull 影子,对比 reported 和 desired,自己同步
离线设备命令发出但收不到 → 超时desired 留在影子里,设备上线后自动同步
状态语义“我让你做这件事”“你应该是这个状态”
适用场景实时控制(重启、立即推送)配置同步(上报间隔、阈值、模式)

03 篇里的 DeviceShadow 中的SHADOW_DESIRED 类型的物模型属性必须可写,对应的就是这条路径:

1
2
case SHADOW_DESIRED -> property.accessMode() != ThingPropertyAccessMode.READ_ONLY
        && shadowSchema.supportsDesiredField(property.capabilityCode().value());

当前项目里 desired 这条路径只走通了 schema 校验,没有真正的同步实现;平台改了 desired 后,怎么通知设备?设备上线后怎么 pull?这套同步机制项目里还没做。

在生产环境的 IoT 平台一般两套都有:

  • 命令式用于”立即类”动作——重启、推送、告警弹窗;
  • 影子期望式用于”配置类”状态——上报间隔、模式、阈值。

本项目的 SET_REPORT_INTERVAL 命令在领域语义上其实更适合影子期望式,上报间隔是”设备的配置状态”,离线设备上线后应该自动同步到新值。

注意,影子不在 command 上下文,而在 device 上下文中,03 篇里 DeviceShadowDevice 聚合的内部实体:

1
2
3
4
5
6
7
8
public record DeviceShadow(Long version, String document, Instant updatedAt) {
    public static DeviceShadow create(String document, Instant updatedAt) {
        return new DeviceShadow(1L, document, updatedAt);
    }
    public DeviceShadow update(String document, Instant updatedAt) {
        return new DeviceShadow(version + 1, document, updatedAt);
    }
}

如果未来加影子同步,应该是:

  • device 上下文新增 Device.changeDesired(document, updatedAt) 方法;
  • 改完发 DeviceShadowDesiredChangedEvent
  • 接入层(access)订阅这个事件,下推到 MQTT;
  • 设备上线时主动 pull 影子(HTTP 或 MQTT 下行 topic)。

iot-context-command 是两条独立的下行链路,各管各的。


项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

本文由作者按照 CC BY 4.0 进行授权