文章

12 mock-device 下篇:网关、跨语言协议适配

12 mock-device 下篇:网关、跨语言协议适配

本篇讲mock-device的另一半作用 LwM2M 网关,以及一个独立的 C 语言工程 mock-device-modbus

项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

目录

网关

LwM2M → MQTT 翻译层:Lwm2mDeviceSnapshot 是关键

网关下行命令:从 MQTT 翻译成 LwM2M Write

mock-device-modbus:用 C 模拟工业设备

协议适配、共享状态等

网关

设备世界的协议非常多MQTT、CoAP、LwM2M、Modbus、ZigBee、私有 TCP、行业总线PLC等,平台不可能为每种协议都建一条独立链路。网关的本质就是把“协议异质”在更靠近设备的位置抹平,把多种协议设备接入网关,网关用平台认识的协议(一般是 MQTT 或 HTTP)统一上报。

mock-device 工程同时演示了两种角色:

  • 设备角色,上篇讲的 MockDeviceTelemetryService,模拟一个直接讲 MQTT 的智能设备;
  • 网关角色GatewayServerService 等组件,作为 LwM2M server 接受 LwM2M 设备注册,把它们的数据翻译成 MQTT 上报。

第二种角色对应的下游设备是 mock-device-modbus,一个 C 写的工程,模拟一台只会 Modbus 和 LwM2M 的工业设备,必须经过网关才能把数据送到平台。

结构图

LwM2M → MQTT 翻译层:Lwm2mDeviceSnapshot

网关上行翻译的核心是把 LwM2M 这种资源寻址的协议,翻译成 MQTT 这种topic + payload的协议,两者抽象层不同:

  • LwM2M:每条上报是一个 (objectId, instanceId, resourceId, value) 四元组——例如 /3303/0/5700 表示”温度传感器对象 #0 的当前值”;
  • MQTT:每条上报是一个 (topic, payload) 二元组——例如 iot/{deviceId}/telemetry 上发一个 JSON。

直接把 LwM2M 上报的每条 resource 翻译成一条 MQTT 消息会有问题,温度和湿度是同一台设备的不同 resource,分开上报到 MQTT 后下游要自己拼起来,这个责任不该交给下游。

mock-device 网关的解法是引入一个领域聚合 Lwm2mDeviceSnapshot,以”设备”为粒度持有最新指标快照,攒齐了再发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public record Lwm2mDeviceSnapshot(
        String deviceId,
        BigDecimal temperature,
        BigDecimal humidity,
        OffsetDateTime updatedAt) {

    public boolean isComplete() {
        return temperature != null && humidity != null;
    }

    public Lwm2mDeviceSnapshot withTemperature(BigDecimal value, OffsetDateTime ts) {
        return new Lwm2mDeviceSnapshot(deviceId, value, humidity, ts);
    }

    public Lwm2mDeviceSnapshot withHumidity(BigDecimal value, OffsetDateTime ts) {
        return new Lwm2mDeviceSnapshot(deviceId, temperature, value, ts);
    }

    public GatewayUplinkMessage toGatewayMessage(String gatewayId) {
        return new GatewayUplinkMessage(
                deviceId,
                gatewayId,
                "lwm2m-gateway",
                temperature,
                humidity,
                updatedAt.toString());
    }
}

这个 record 体现了三件事:

1. 不可变 + with 风格更新

不像 backend 里的 TelemetrySnapshot.refreshBy(event) 一次合并一整个事件,Lwm2mDeviceSnapshot 是按字段增量构建的,withTemperaturewithHumidity 各自返回新对象。这样 LwM2M 服务器收到 temperature observe 时只调 withTemperature,不需要凑齐 humidity。

2. isComplete() 是发布前置条件

网关只有在快照”完整”时才向 MQTT 发,不然下游 telemetry 上下文会拒收(required=true 的指标缺失,整条上报会被 schema 校验抛掉,参考 04 篇)。

3. toGatewayMessage() 是协议翻译入口

这个方法把 LwM2M 领域对象翻译成 MQTT 上报的领域对象,GatewayUplinkMessage,加了一个 gatewayId 字段标记”这条数据来自哪个网关”。下游 access 拿到这个消息后能溯源到具体网关,这是合规和故障排查需要的字段。

转发器把这三件事编排起来

GatewayTelemetryForwarder 负责整个翻译流水线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class GatewayTelemetryForwarder implements Lwm2mServerHandler, AutoCloseable {

    private final Map<String, Lwm2mDeviceSnapshot> latestSnapshots;
    private final GatewayTelemetryPublishScheduler publishScheduler;
    private final GatewayTelemetryDeduplicator deduplicator;

    @Override
    public void onTelemetryReported(Lwm2mDeviceSnapshot snapshot) {
        latestSnapshots.put(snapshot.deviceId(), snapshot);
        if (!snapshot.isComplete()) {
            return;
        }
        publishScheduler.schedule(snapshot.deviceId(), () -> publishLatest(snapshot.deviceId()));
    }

    private void publishLatest(String deviceId) {
        Lwm2mDeviceSnapshot latest = latestSnapshots.get(deviceId);
        if (latest == null || !latest.isComplete()) {
            return;
        }
        GatewayUplinkMessage message = latest.toGatewayMessage(config.gatewayId());
        if (deduplicator.isDuplicate(message)) {
            return;
        }
        String topic = config.topicPattern().replace("{deviceId}", message.deviceId());
        try {
            String payload = objectMapper.writeValueAsString(message);
            mqttMessagePublisher.publish(topic, payload, config.mqttQos());
            deduplicator.markPublished(message);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("failed to serialize telemetry message", e);
        }
    }
}

它做了三件事:

  • latestSnapshots 按 deviceId 维护快照,收到 LwM2M observe 时更新;
  • publishScheduler 节流,同一设备 150ms 内的多次 observe 合并成一次发布,避免高频 LwM2M 触发高频 MQTT;
  • deduplicator 去重,5 秒内相同内容不重发,避免 LwM2M 偶发抖动产生重复。

这三件事都是网关层的职责,不应该上推到平台,平台只负责消费”已经处理好的”上报。

端口接口让协议可换

注意 GatewayTelemetryForwarder 实现的是 Lwm2mServerHandler 这个接口——LwM2M 服务器(具体是 LeshanLwm2mServer)通过它回调,并不直接知道有 MQTT 这件事。

1
2
3
4
5
public interface Lwm2mServerHandler {
    void onClientRegistered(String endpoint);
    void onClientUnregistered(String endpoint);
    void onTelemetryReported(Lwm2mDeviceSnapshot snapshot);
}

未来要换上行协议(比如改成 HTTP 上报、或者 Kafka 直连),只需要换 MqttMessagePublisher 实现,转发器和 LwM2M 服务器一行不动。

网关下行命令:从 MQTT 翻译成 LwM2M Write

现在看下行,平台想给 LwM2M 设备发”修改上报间隔”命令,这条链路在网关里怎么走呢?

GatewayServerService.processSetReportIntervalCommandPayload 是入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void processSetReportIntervalCommandPayload(SetReportIntervalCommandPayload command) {
    // 1. 协议层校验
    if (!"set_report_interval".equalsIgnoreCase(command.commandType())) {
        publishAck(... "FAILED", "unsupported command type");
        return;
    }
    if (command.params() == null || command.params().intervalMs() < 500) {
        publishAck(... "FAILED", "invalid interval");
        return;
    }

    try {
        // 2. 通过 LwM2M Write 下发到设备
        boolean delivered = lwm2MServerRuntime.setReportInterval(command);
        if (!delivered) {
            publishAck(... "FAILED", "device is offline or not registered");
            return;
        }

        // 3. 成功 ACK
        publishAck(... "SUCCESS", "interval changed to " + command.params().intervalMs());
    } catch (InvalidReportIntervalException e) {
        publishAck(... "FAILED", e.getMessage());
    }
}

下行复杂度比上行更高,因为它要处理三种失败状态:

  • 协议层失败(命令类型不对、参数不合法);
  • 设备不在线(LwM2M 没有这台设备的注册记录);
  • 设备拒绝(设备返回错误码)。

每种失败都要变成对平台的 ACK,成功就发 SUCCESS,失败就发 FAILED 带原因。这条 ACK 走的是上行 MQTT,所以网关的角色既是 LwM2M server(收设备数据 + 下发 LwM2M 命令)又是 MQTT client(向平台上报数据 + 上报 ACK)。

LwM2M Write 的具体形态

LeshanLwm2mServer.setReportInterval 把领域命令翻译成 LwM2M 协议层的写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public boolean setReportInterval(SetReportIntervalCommandPayload command)
        throws InvalidReportIntervalException {
    if (server == null) {
        throw new InvalidReportIntervalException("gateway server not started");
    }

    Registration registration = server.getRegistrationService().getByEndpoint(command.deviceId());
    if (registration == null) {
        return false;   // 设备未注册
    }

    WriteResponse response = server.send(
            registration,
            new WriteRequest(
                    REPORT_INTERVAL_OBJECT_ID,        // 31024
                    REPORT_INTERVAL_INSTANCE_ID,      // 0
                    REPORT_INTERVAL_RESOURCE_ID,      // 1
                    (long) command.params().intervalMs()),
            5000L);
    // ...
    return response.isSuccess();
}

注意几个细节:

1. 自定义 Object ID 31024

LwM2M 标准定义了一批 Object(3303 温度、3304 湿度、3 设备等),但“上报间隔”不在标准里。31024 是自定义 Object,这条上报间隔配置是平台和设备双方约定好的,不是 LwM2M 的标准。

2. 5 秒同步等待

server.send(..., 5000L) 是同步阻塞——网关等 5 秒看设备有没有响应,这对 application 服务来说是潜在阻塞点(占用线程),但对 LwM2M 这种”对端可能在弱网络里”的协议是合理的。

整条下行链路如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
平台发 MQTT 命令 (JSON)
    ↓
GatewayMqttCommandConsumer 收到,反序列化成 SetReportIntervalCommandPayload
    ↓
GatewayServerService 校验 + 调 lwm2MServerRuntime
    ↓
LeshanLwm2mServer 翻译成 LwM2M WriteRequest
    ↓
设备 (mock-device-modbus 在 C 那边)
    ↓
设备返回 WriteResponse
    ↓
逐层往回到 publishAck,发 MQTT ACK 给平台

mock-device-modbus:用 C 模拟工业设备

mock-device-modbus 是模拟一台 Modbus 和 LwM2M 协议的工业设备,它在 WSL 里跑,作为网关的下游设备。它按子模块分目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
mock-device-modbus/src/
├── main.c                              ← 启动入口
├── app/                                ← 应用编排
│   ├── app_config.{c,h}                ← 运行时配置加载
│   └── gateway_app.{c,h}               ← 主循环编排
├── telemetry/                          ← 设备状态
│   └── telemetry_state.{c,h}           ← 共享的遥测状态(带 mutex)
├── modbus/                             ← Modbus 协议适配
│   ├── modbus_simulator.{c,h}          ← Modbus slave(被采集端)
│   └── modbus_collector.{c,h}          ← Modbus master(采集端)
└── lwm2m/                              ← LwM2M 协议适配
    ├── anjay_adapter.{c,h}             ← anjay LwM2M 客户端
    └── report_interval_object.{c,h}    ← 自定义 Object 31024 处理

目录分层比较清楚简单:

  • app/ ,启动、配置、主循环;
  • telemetry/ ,跨模块共享的设备状态;
  • modbus/ ,一种协议适配(对内设备总线);
  • lwm2m/ ,另一种协议适配(对网关上行)。

gateway_app_run:装配 + 主循环

app/gateway_app.cgateway_app_run 函数承担”装配车间”的职责:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int gateway_app_run(void) {
    app_config_t config = app_config_load();
    telemetry_state_t telemetry_state;
    modbus_simulator_t simulator;
    modbus_collector_t collector;
    anjay_adapter_t lwm2m_client;

    setvbuf(stdout, NULL, _IOLBF, 0);
    setvbuf(stderr, NULL, _IONBF, 0);

    telemetry_state_init(&telemetry_state, config.device_id, config.poll_interval_ms);

    if (modbus_simulator_init(&simulator, &config)
            || modbus_collector_init(&collector, &config, &telemetry_state, &simulator)
            || anjay_adapter_init(&lwm2m_client, &config, &telemetry_state)) {
        fprintf(stderr, "mock-device-modbus init failed\n");
        return EXIT_FAILURE;
    }

    if (modbus_simulator_start(&simulator) || anjay_adapter_start(&lwm2m_client)) {
        // 启动失败 + 资源清理
        return EXIT_FAILURE;
    }

    for (;;) {
        modbus_simulator_tick(&simulator);
        modbus_collector_poll(&collector);
        anjay_adapter_step(&lwm2m_client);
        // 按 telemetry_state 里当前的 report_interval 睡眠
        // ...
    }
}

协议适配、共享状态等

1. telemetry_state 是”共享状态聚合”

telemetry/telemetry_state.h 是整个工程里唯一被多个模块同时读写的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct telemetry_state {
    const char *device_id;
    double temperature;
    double humidity;
    int report_interval_ms;
    unsigned long version;
    pthread_mutex_t mutex;
} telemetry_state_t;

void telemetry_state_init(telemetry_state_t *state, const char *device_id, int report_interval_ms);
void telemetry_state_update(telemetry_state_t *state, double temperature, double humidity);
void telemetry_state_set_report_interval(telemetry_state_t *state, int report_interval_ms);
int telemetry_state_get_report_interval(const telemetry_state_t *state);
void telemetry_state_get_snapshot(const telemetry_state_t *state,
                                  double *temperature,
                                  double *humidity,
                                  unsigned long *version);

它在工程里扮演几个角色:

  • 被 modbus 采集器写modbus_collector_poll 读到新值后调 telemetry_state_update
  • 被 LwM2M 适配器读anjay_adapter_step 读最新值,作为 LwM2M Resource 上报到网关;
  • 被下行命令写,LwM2M Write 把 report_interval_object 接收到的新间隔调 telemetry_state_set_report_interval
  • 被主循环读,决定下一轮 nanosleep 多久。

读写集中在一个 struct + 自带 mutex,这是 C 工程里”聚合 + 不变量保护”的常见形态pthread_mutex_t 字段嵌在 struct 里,调用方不用关心锁的存在,所有 set/get 函数内部加锁。

2. modbus 和 lwm2m 是两道协议适配

modbus/lwm2m/ 两个目录各自封装一种协议:

  • modbus/ 用 libmodbus 跟下游设备讲 Modbus TCP;
  • lwm2m/ 用 anjay 跟上游网关讲 LwM2M(CoAP/UDP)。

两边都不知道对方的存在,它们都只跟 telemetry_state 打交道:

1
2
3
4
5
[modbus_collector]  →写→  telemetry_state  ←读←  [anjay_adapter]
                                  ↑
                              (下行 LwM2M)
                                  ↓
                          [report_interval_object] →写→ telemetry_state

这条流向图的核心是协议适配模块之间不直接通信,全部通过共享状态间接交互。如果未来要换协议(比如 modbus 换成 OPC UA、lwm2m 换成 MQTT),只需要改对应目录里的代码,telemetry_state 不用动、应用主循环不用动。

3. anjay_adapter 实现 LwM2M

LwM2M 协议本身很复杂,客户端注册、observe、bootstrap、anjay 库自身的事件循环,这些细节全部封装在 lwm2m/anjay_adapter.{c,h} 里,对外只暴露三个函数:

1
2
3
int anjay_adapter_init(anjay_adapter_t *self, const app_config_t *config, telemetry_state_t *state);
int anjay_adapter_start(anjay_adapter_t *self);
void anjay_adapter_step(anjay_adapter_t *self);

应用层(gateway_app)调这三个函数,完全不需要掌握 anjay 内部的复杂调用。


项目地址:https://github.com/Liyuwen85/iot-alarm-copilot

本文由作者按照 CC BY 4.0 进行授权