12 mock-device 下篇:网关、跨语言协议适配
本篇讲mock-device的另一半作用 LwM2M 网关,以及一个独立的 C 语言工程 mock-device-modbus。
目录
网关
LwM2M → MQTT 翻译层:Lwm2mDeviceSnapshot 是关键
网关下行命令:从 MQTT 翻译成 LwM2M Write
mock-device-modbus:用 C 模拟工业设备
协议适配、共享状态等
网关
设备世界的协议非常多MQTT、CoAP、LwM2M、Modbus、ZigBee、私有 TCP、行业总线PLC等,平台不可能为每种协议都建一条独立链路。网关的本质就是把“协议异质”在更靠近设备的位置抹平,把多种协议设备接入网关,网关用平台认识的协议(一般是 MQTT 或 HTTP)统一上报。
mock-device 工程同时演示了两种角色:
- 设备角色,上篇讲的
MockDeviceTelemetryService,模拟一个直接讲 MQTT 的智能设备; - 网关角色,
GatewayServerService等组件,作为 LwM2M server 接受 LwM2M 设备注册,把它们的数据翻译成 MQTT 上报。
第二种角色对应的下游设备是 mock-device-modbus,一个 C 写的工程,模拟一台只会 Modbus 和 LwM2M 的工业设备,必须经过网关才能把数据送到平台。
LwM2M → MQTT 翻译层:Lwm2mDeviceSnapshot
网关上行翻译的核心是把 LwM2M 这种资源寻址的协议,翻译成 MQTT 这种topic + payload的协议,两者抽象层不同:
- LwM2M:每条上报是一个
(objectId, instanceId, resourceId, value)四元组——例如/3303/0/5700表示”温度传感器对象 #0 的当前值”; - MQTT:每条上报是一个
(topic, payload)二元组——例如iot/{deviceId}/telemetry上发一个 JSON。
直接把 LwM2M 上报的每条 resource 翻译成一条 MQTT 消息会有问题,温度和湿度是同一台设备的不同 resource,分开上报到 MQTT 后下游要自己拼起来,这个责任不该交给下游。
mock-device 网关的解法是引入一个领域聚合 Lwm2mDeviceSnapshot,以”设备”为粒度持有最新指标快照,攒齐了再发:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public record Lwm2mDeviceSnapshot(
String deviceId,
BigDecimal temperature,
BigDecimal humidity,
OffsetDateTime updatedAt) {
public boolean isComplete() {
return temperature != null && humidity != null;
}
public Lwm2mDeviceSnapshot withTemperature(BigDecimal value, OffsetDateTime ts) {
return new Lwm2mDeviceSnapshot(deviceId, value, humidity, ts);
}
public Lwm2mDeviceSnapshot withHumidity(BigDecimal value, OffsetDateTime ts) {
return new Lwm2mDeviceSnapshot(deviceId, temperature, value, ts);
}
public GatewayUplinkMessage toGatewayMessage(String gatewayId) {
return new GatewayUplinkMessage(
deviceId,
gatewayId,
"lwm2m-gateway",
temperature,
humidity,
updatedAt.toString());
}
}
这个 record 体现了三件事:
1. 不可变 + with 风格更新
不像 backend 里的 TelemetrySnapshot.refreshBy(event) 一次合并一整个事件,Lwm2mDeviceSnapshot 是按字段增量构建的,withTemperature 和 withHumidity 各自返回新对象。这样 LwM2M 服务器收到 temperature observe 时只调 withTemperature,不需要凑齐 humidity。
2. isComplete() 是发布前置条件
网关只有在快照”完整”时才向 MQTT 发,不然下游 telemetry 上下文会拒收(required=true 的指标缺失,整条上报会被 schema 校验抛掉,参考 04 篇)。
3. toGatewayMessage() 是协议翻译入口
这个方法把 LwM2M 领域对象翻译成 MQTT 上报的领域对象,GatewayUplinkMessage,加了一个 gatewayId 字段标记”这条数据来自哪个网关”。下游 access 拿到这个消息后能溯源到具体网关,这是合规和故障排查需要的字段。
转发器把这三件事编排起来
GatewayTelemetryForwarder 负责整个翻译流水线:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class GatewayTelemetryForwarder implements Lwm2mServerHandler, AutoCloseable {
private final Map<String, Lwm2mDeviceSnapshot> latestSnapshots;
private final GatewayTelemetryPublishScheduler publishScheduler;
private final GatewayTelemetryDeduplicator deduplicator;
@Override
public void onTelemetryReported(Lwm2mDeviceSnapshot snapshot) {
latestSnapshots.put(snapshot.deviceId(), snapshot);
if (!snapshot.isComplete()) {
return;
}
publishScheduler.schedule(snapshot.deviceId(), () -> publishLatest(snapshot.deviceId()));
}
private void publishLatest(String deviceId) {
Lwm2mDeviceSnapshot latest = latestSnapshots.get(deviceId);
if (latest == null || !latest.isComplete()) {
return;
}
GatewayUplinkMessage message = latest.toGatewayMessage(config.gatewayId());
if (deduplicator.isDuplicate(message)) {
return;
}
String topic = config.topicPattern().replace("{deviceId}", message.deviceId());
try {
String payload = objectMapper.writeValueAsString(message);
mqttMessagePublisher.publish(topic, payload, config.mqttQos());
deduplicator.markPublished(message);
} catch (JsonProcessingException e) {
throw new IllegalStateException("failed to serialize telemetry message", e);
}
}
}
它做了三件事:
latestSnapshots按 deviceId 维护快照,收到 LwM2M observe 时更新;publishScheduler节流,同一设备 150ms 内的多次 observe 合并成一次发布,避免高频 LwM2M 触发高频 MQTT;deduplicator去重,5 秒内相同内容不重发,避免 LwM2M 偶发抖动产生重复。
这三件事都是网关层的职责,不应该上推到平台,平台只负责消费”已经处理好的”上报。
端口接口让协议可换
注意 GatewayTelemetryForwarder 实现的是 Lwm2mServerHandler 这个接口——LwM2M 服务器(具体是 LeshanLwm2mServer)通过它回调,并不直接知道有 MQTT 这件事。
1
2
3
4
5
public interface Lwm2mServerHandler {
void onClientRegistered(String endpoint);
void onClientUnregistered(String endpoint);
void onTelemetryReported(Lwm2mDeviceSnapshot snapshot);
}
未来要换上行协议(比如改成 HTTP 上报、或者 Kafka 直连),只需要换 MqttMessagePublisher 实现,转发器和 LwM2M 服务器一行不动。
网关下行命令:从 MQTT 翻译成 LwM2M Write
现在看下行,平台想给 LwM2M 设备发”修改上报间隔”命令,这条链路在网关里怎么走呢?
GatewayServerService.processSetReportIntervalCommandPayload 是入口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void processSetReportIntervalCommandPayload(SetReportIntervalCommandPayload command) {
// 1. 协议层校验
if (!"set_report_interval".equalsIgnoreCase(command.commandType())) {
publishAck(... "FAILED", "unsupported command type");
return;
}
if (command.params() == null || command.params().intervalMs() < 500) {
publishAck(... "FAILED", "invalid interval");
return;
}
try {
// 2. 通过 LwM2M Write 下发到设备
boolean delivered = lwm2MServerRuntime.setReportInterval(command);
if (!delivered) {
publishAck(... "FAILED", "device is offline or not registered");
return;
}
// 3. 成功 ACK
publishAck(... "SUCCESS", "interval changed to " + command.params().intervalMs());
} catch (InvalidReportIntervalException e) {
publishAck(... "FAILED", e.getMessage());
}
}
下行复杂度比上行更高,因为它要处理三种失败状态:
- 协议层失败(命令类型不对、参数不合法);
- 设备不在线(LwM2M 没有这台设备的注册记录);
- 设备拒绝(设备返回错误码)。
每种失败都要变成对平台的 ACK,成功就发 SUCCESS,失败就发 FAILED 带原因。这条 ACK 走的是上行 MQTT,所以网关的角色既是 LwM2M server(收设备数据 + 下发 LwM2M 命令)又是 MQTT client(向平台上报数据 + 上报 ACK)。
LwM2M Write 的具体形态
LeshanLwm2mServer.setReportInterval 把领域命令翻译成 LwM2M 协议层的写操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public boolean setReportInterval(SetReportIntervalCommandPayload command)
throws InvalidReportIntervalException {
if (server == null) {
throw new InvalidReportIntervalException("gateway server not started");
}
Registration registration = server.getRegistrationService().getByEndpoint(command.deviceId());
if (registration == null) {
return false; // 设备未注册
}
WriteResponse response = server.send(
registration,
new WriteRequest(
REPORT_INTERVAL_OBJECT_ID, // 31024
REPORT_INTERVAL_INSTANCE_ID, // 0
REPORT_INTERVAL_RESOURCE_ID, // 1
(long) command.params().intervalMs()),
5000L);
// ...
return response.isSuccess();
}
注意几个细节:
1. 自定义 Object ID 31024
LwM2M 标准定义了一批 Object(3303 温度、3304 湿度、3 设备等),但“上报间隔”不在标准里。31024 是自定义 Object,这条上报间隔配置是平台和设备双方约定好的,不是 LwM2M 的标准。
2. 5 秒同步等待
server.send(..., 5000L) 是同步阻塞——网关等 5 秒看设备有没有响应,这对 application 服务来说是潜在阻塞点(占用线程),但对 LwM2M 这种”对端可能在弱网络里”的协议是合理的。
整条下行链路如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
平台发 MQTT 命令 (JSON)
↓
GatewayMqttCommandConsumer 收到,反序列化成 SetReportIntervalCommandPayload
↓
GatewayServerService 校验 + 调 lwm2MServerRuntime
↓
LeshanLwm2mServer 翻译成 LwM2M WriteRequest
↓
设备 (mock-device-modbus 在 C 那边)
↓
设备返回 WriteResponse
↓
逐层往回到 publishAck,发 MQTT ACK 给平台
mock-device-modbus:用 C 模拟工业设备
mock-device-modbus 是模拟一台 Modbus 和 LwM2M 协议的工业设备,它在 WSL 里跑,作为网关的下游设备。它按子模块分目录:
1
2
3
4
5
6
7
8
9
10
11
12
13
mock-device-modbus/src/
├── main.c ← 启动入口
├── app/ ← 应用编排
│ ├── app_config.{c,h} ← 运行时配置加载
│ └── gateway_app.{c,h} ← 主循环编排
├── telemetry/ ← 设备状态
│ └── telemetry_state.{c,h} ← 共享的遥测状态(带 mutex)
├── modbus/ ← Modbus 协议适配
│ ├── modbus_simulator.{c,h} ← Modbus slave(被采集端)
│ └── modbus_collector.{c,h} ← Modbus master(采集端)
└── lwm2m/ ← LwM2M 协议适配
├── anjay_adapter.{c,h} ← anjay LwM2M 客户端
└── report_interval_object.{c,h} ← 自定义 Object 31024 处理
目录分层比较清楚简单:
app/,启动、配置、主循环;telemetry/,跨模块共享的设备状态;modbus/,一种协议适配(对内设备总线);lwm2m/,另一种协议适配(对网关上行)。
gateway_app_run:装配 + 主循环
app/gateway_app.c 里 gateway_app_run 函数承担”装配车间”的职责:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int gateway_app_run(void) {
app_config_t config = app_config_load();
telemetry_state_t telemetry_state;
modbus_simulator_t simulator;
modbus_collector_t collector;
anjay_adapter_t lwm2m_client;
setvbuf(stdout, NULL, _IOLBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
telemetry_state_init(&telemetry_state, config.device_id, config.poll_interval_ms);
if (modbus_simulator_init(&simulator, &config)
|| modbus_collector_init(&collector, &config, &telemetry_state, &simulator)
|| anjay_adapter_init(&lwm2m_client, &config, &telemetry_state)) {
fprintf(stderr, "mock-device-modbus init failed\n");
return EXIT_FAILURE;
}
if (modbus_simulator_start(&simulator) || anjay_adapter_start(&lwm2m_client)) {
// 启动失败 + 资源清理
return EXIT_FAILURE;
}
for (;;) {
modbus_simulator_tick(&simulator);
modbus_collector_poll(&collector);
anjay_adapter_step(&lwm2m_client);
// 按 telemetry_state 里当前的 report_interval 睡眠
// ...
}
}
协议适配、共享状态等
1. telemetry_state 是”共享状态聚合”
telemetry/telemetry_state.h 是整个工程里唯一被多个模块同时读写的对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct telemetry_state {
const char *device_id;
double temperature;
double humidity;
int report_interval_ms;
unsigned long version;
pthread_mutex_t mutex;
} telemetry_state_t;
void telemetry_state_init(telemetry_state_t *state, const char *device_id, int report_interval_ms);
void telemetry_state_update(telemetry_state_t *state, double temperature, double humidity);
void telemetry_state_set_report_interval(telemetry_state_t *state, int report_interval_ms);
int telemetry_state_get_report_interval(const telemetry_state_t *state);
void telemetry_state_get_snapshot(const telemetry_state_t *state,
double *temperature,
double *humidity,
unsigned long *version);
它在工程里扮演几个角色:
- 被 modbus 采集器写,
modbus_collector_poll读到新值后调telemetry_state_update; - 被 LwM2M 适配器读,
anjay_adapter_step读最新值,作为 LwM2M Resource 上报到网关; - 被下行命令写,LwM2M Write 把
report_interval_object接收到的新间隔调telemetry_state_set_report_interval; - 被主循环读,决定下一轮
nanosleep多久。
读写集中在一个 struct + 自带 mutex,这是 C 工程里”聚合 + 不变量保护”的常见形态。pthread_mutex_t 字段嵌在 struct 里,调用方不用关心锁的存在,所有 set/get 函数内部加锁。
2. modbus 和 lwm2m 是两道协议适配
modbus/ 和 lwm2m/ 两个目录各自封装一种协议:
modbus/用 libmodbus 跟下游设备讲 Modbus TCP;lwm2m/用 anjay 跟上游网关讲 LwM2M(CoAP/UDP)。
两边都不知道对方的存在,它们都只跟 telemetry_state 打交道:
1
2
3
4
5
[modbus_collector] →写→ telemetry_state ←读← [anjay_adapter]
↑
(下行 LwM2M)
↓
[report_interval_object] →写→ telemetry_state
这条流向图的核心是协议适配模块之间不直接通信,全部通过共享状态间接交互。如果未来要换协议(比如 modbus 换成 OPC UA、lwm2m 换成 MQTT),只需要改对应目录里的代码,telemetry_state 不用动、应用主循环不用动。
3. anjay_adapter 实现 LwM2M
LwM2M 协议本身很复杂,客户端注册、observe、bootstrap、anjay 库自身的事件循环,这些细节全部封装在 lwm2m/anjay_adapter.{c,h} 里,对外只暴露三个函数:
1
2
3
int anjay_adapter_init(anjay_adapter_t *self, const app_config_t *config, telemetry_state_t *state);
int anjay_adapter_start(anjay_adapter_t *self);
void anjay_adapter_step(anjay_adapter_t *self);
应用层(gateway_app)调这三个函数,完全不需要掌握 anjay 内部的复杂调用。
