10 下行命令链路和影子期望
前几篇讲的全是上行链路“设备 → broker → kafka → access → telemetry → rule → alarm → ai → audit”。本篇讲下行“平台怎么把一条指令送到设备、怎么知道设备收到了、收不到怎么办”。下行链路在 IoT 平台里看起来单薄,但失败模式和复杂度比上行要高。
目录
下行链路为什么比上行难
DeviceCommand 状态机:5 态 + 3 条终结路径
下发路径:写库 → MQTT 发布 → markSent
ACK 路径:异步、幂等、迟到容忍
影子期望
下行链路为什么比上行难
上行链路看起来复杂,协议、schema、规则、告警、AI、审计,但都是设备主动推送、平台被动接收处理,无交互、相对简单。
下行则反过来,平台主动推、设备被动收,这样就有三个问题出现了:
1. 设备可能不在线
设备掉线、设备断电、设备的 MQTT 连接刚被踢掉,平台不一定知道;另外,命令发出去,broker 接收了不等于设备接收了。
2. 设备的”成功”和”平台的成功”不是一回事
平台把命令发到 broker、broker 给了 PUBACK,平台以为成功了,但设备可能根本没收到。要让”平台知道设备收到了”,必须让设备主动 ACK,这条 ACK 走的还是上行链路。
3. 命令的副作用不能撤回
上行只是”读取数据”,错了重读一次就好;下行是”改变设备状态”,发一条”设备重启”命令出去,一旦发错就只能补救,不能回滚。
所以总结一下,就下面三句话:
- 设备可能永远不会 ACK;
- ACK 可能在任何时间到达(30 秒内、3 小时后、3 天后);
- 命令一旦发出,”已发送”和”已生效”是两个不同的事实。
本项目的 iot-context-command 上下文就是围绕这三件事设计,下面展开细说。
DeviceCommand 状态机:5 态 + 3 条终结路径
DeviceCommand 是 command 上下文的聚合根,5 个状态:
1
2
3
4
5
6
7
public enum CommandStatus {
CREATED, // 已创建,未发出
SENT, // 已发出,等 ACK
ACKED_SUCCESS, // 设备回 ACK,执行成功
ACKED_FAILED, // 设备回 ACK,执行失败
TIMED_OUT // 等 ACK 超时
}
合法状态转移图:
1
2
3
4
5
6
7
[创建]
↓
CREATED ──markSent──▶ SENT ──markAckSuccess──▶ ACKED_SUCCESS (终态)
│
├──markAckFailed───────▶ ACKED_FAILED (终态)
│
└──markTimedOut───────▶ TIMED_OUT (终态)
这个结构跟 07 篇的 Alarm 不同,Alarm 是 3 态线性递进;DeviceCommand 是 5 态 + 3 条并列终态,对应“设备 ACK 成功、设备 ACK 失败、超时无 ACK”三种不同的结局。
下发路径:写库 → MQTT 发布 → markSent
CommandApplicationService.sendSetReportInterval 是下发命令的入口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Transactional
public DeviceCommandVO sendSetReportInterval(SendSetReportIntervalCommand command) {
Instant now = command.requestedAt();
String commandId = "cmd-" + UUID.randomUUID();
String payloadJson = buildSetReportIntervalPayload(...);
// 1. 创建命令并落库(CREATED 状态)
DeviceCommand created = DeviceCommand.createSetReportInterval(commandId, command.deviceId(), payloadJson, now);
DeviceCommandSaveResult saveResult = deviceCommandRepository.saveIfAbsent(created);
DeviceCommand saved = saveResult.command();
// 2. 发布 CommandCreatedEvent(让 audit 留痕)
applicationEventPublisher.publishEvent(new CommandCreatedEvent(...));
// 3. 通过 MQTT 发布到设备
Instant publishStartedAt = Instant.now();
commandMqttPublishPort.publish(saved.deviceId(), saved.payloadJson());
commandPublishTimer.record(Duration.between(publishStartedAt, Instant.now()));
// 4. 切换状态到 SENT
DeviceCommandStatusUpdateResult updateResult = deviceCommandRepository.updateStatus(saved.markSent(now));
return toVO(updateResult.command());
}
但中间还有个小问题:MQTT 发完才 markSent
注意第 3 步和第 4 步,MQTT 已经发出去了,但 markSent 在第 4 步才执行。如果第 3 步成功、第 4 步失败(比如数据库瞬时不可用),命令在数据库里是 CREATED 状态,但实际上已经发出去了,这就会产生一个bug。
当前的处理方式是:
- 整个方法在一个
@Transactional里,如果第 4 步失败,第 1 步也会回滚,命令记录不会留下,但 MQTT 那边其实已经发了; - 所以最坏情况是”命令已发但平台无记录”;
真正的解法:outbox 模式
要把这里做严谨,就前面几篇文章中提到的,生产环境应该走 outbox 模式:
- 第 1 步在事务里写两张表:
device_command(业务表)+command_outbox(待发表); - 事务提交后,独立的 publisher 进程从 outbox 读出来发 MQTT;
- 发成功后,把 outbox 行标记为已发,并 update
device_command.status = SENT;
这样保证”业务记录和 MQTT 发送”两件事最终一致,但代价是增加一个 publisher 进程、增加发送延迟、需要处理 publisher 自己的故障转移。
MQTT 发布也是端口
注意 commandMqttPublishPort 是一个端口接口:
1
2
3
public interface CommandMqttPublishPort {
void publish(String deviceId, String payloadJson);
}
实现是 MqttCommandPublisher,但应用服务不知道。未来切到 Kafka 下行、HTTP 下行、CoAP 下行都可以,业务代码不动。
ACK 路径:异步、幂等、迟到容忍
设备处理完命令后会上行一条 ACK 消息。这条 ACK 在本项目里走的是和遥测同样的路径 MQTT → bridge → Kafka → backend 消费,只不过 topic 不同。
CommandAckKafkaConsumer 收到消息后调 CommandAckApplicationService.handleAck:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Transactional
public void handleAck(CommandAckPayload payload) {
DeviceCommand current = deviceCommandRepository.loadByCommandId(payload.commandId());
// 1. 幂等:已经 ACK 过的,直接忽略
if (current.isAcked()) {
log.info("command-ack-ignored commandId={} reason=already-acked", current.commandId());
return;
}
// 2. 根据 ACK 内容切换状态(状态上也保证了幂等)
DeviceCommand updated = isSuccess(payload.status())
? current.markAckSuccess(payload.message(), payload.ackedAt())
: current.markAckFailed(payload.message(), payload.ackedAt());
DeviceCommandStatusUpdateResult result = deviceCommandRepository.updateStatus(updated);
// 3. 记 metrics
if (isSuccess(payload.status())) {
ackSuccessCounter.increment();
} else {
ackFailedCounter.increment();
}
if (result.command().sentAt() != null) {
ackLatencyTimer.record(Duration.between(result.command().sentAt(), payload.ackedAt()));
}
// 4. 发布 CommandAckReceivedEvent
applicationEventPublisher.publishEvent(new CommandAckReceivedEvent(...));
}
迟到 ACK 怎么处理
注意 markAckSuccess 接受任意时间点的 ackedAt,理论上可以是 5 分钟前、5 小时前、甚至 5 天前的时间。但问题来了:如果命令已经被超时扫描标记为 TIMED_OUT,迟到 ACK 进来怎么办?
ensureAckAllowed() 会拒绝,TIMED_OUT 不是 SENT。这条 ACK 会被悄悄丢弃(应用层抛异常被吞),下游什么都不会发生。
为什么这么做?原因是:
- 一旦平台超时认定 “设备没收到”,运维可能已经做了重发或排障;
- 这时候迟到 ACK 进来,业务上应该被忽略(因为已经基于”超时”做过决策);
- 但要留痕,迟到 ACK 会被记到 audit 日志或日志文件,运维可以查”哪条命令在超时之后又收到了 ACK”。
ACK 消息里的”成功”和”失败”
注意 CommandAckPayload.status 是字符串:”SUCCESS” 或其他值,这是设备上行 ACK 的协议字段。命令的 ACK 不只是 “我收到了”,还包含“我执行成功了/失败了”(这里简化处理;生产环境中,可以考虑再发一条“命令执行结果”的报文)。
设备执行命令可能:
- 收到了,参数解析失败 → ACK_FAILED
- 收到了,硬件不支持 → ACK_FAILED
- 收到了,执行成功 → ACK_SUCCESS
影子期望
本项目的 iot-context-command 是命令式,平台通过 HTTP API 创建一条命令,立即下发到 MQTT,等设备 ACK。
但 IoT 平台还有另一条主流路径,通过影子期望来完成命令下发执行:
| 维度 | 本项目 | 影子期望式 |
|---|---|---|
| 平台动作 | 创建并立即下发命令 | 修改设备影子的 desired 字段 |
| 设备动作 | 收到命令立即响应 | 上线后主动 pull 影子,对比 reported 和 desired,自己同步 |
| 离线设备 | 命令发出但收不到 → 超时 | desired 留在影子里,设备上线后自动同步 |
| 状态语义 | “我让你做这件事” | “你应该是这个状态” |
| 适用场景 | 实时控制(重启、立即推送) | 配置同步(上报间隔、阈值、模式) |
03 篇里的 DeviceShadow 中的SHADOW_DESIRED 类型的物模型属性必须可写,对应的就是这条路径:
1
2
case SHADOW_DESIRED -> property.accessMode() != ThingPropertyAccessMode.READ_ONLY
&& shadowSchema.supportsDesiredField(property.capabilityCode().value());
当前项目里 desired 这条路径只走通了 schema 校验,没有真正的同步实现;平台改了 desired 后,怎么通知设备?设备上线后怎么 pull?这套同步机制项目里还没做。
在生产环境的 IoT 平台一般两套都有:
- 命令式用于”立即类”动作——重启、推送、告警弹窗;
- 影子期望式用于”配置类”状态——上报间隔、模式、阈值。
本项目的 SET_REPORT_INTERVAL 命令在领域语义上其实更适合影子期望式,上报间隔是”设备的配置状态”,离线设备上线后应该自动同步到新值。
注意,影子不在 command 上下文,而在 device 上下文中,03 篇里 DeviceShadow 是 Device 聚合的内部实体:
1
2
3
4
5
6
7
8
public record DeviceShadow(Long version, String document, Instant updatedAt) {
public static DeviceShadow create(String document, Instant updatedAt) {
return new DeviceShadow(1L, document, updatedAt);
}
public DeviceShadow update(String document, Instant updatedAt) {
return new DeviceShadow(version + 1, document, updatedAt);
}
}
如果未来加影子同步,应该是:
device上下文新增Device.changeDesired(document, updatedAt)方法;- 改完发
DeviceShadowDesiredChangedEvent; - 接入层(access)订阅这个事件,下推到 MQTT;
- 设备上线时主动 pull 影子(HTTP 或 MQTT 下行 topic)。
跟 iot-context-command 是两条独立的下行链路,各管各的。