02 用死信队列处理遥测失败消息
本文聚焦于上行链路中最前面的两个上下文access 和 telemetry,以及一个具体经验:遥测消息处理失败时,为什么不直接用 Kafka 自带的死信topic,而要再做一层数据库死信聚合呢?
目录
access 和 telemetry 的关系
Kafka 死信 topic 和数据库死信聚合并存
生产环境中最佳实践
几个容易踩的坑
access 和 telemetry 的关系
在一些项目里”接入”和”遥测”是合并在一起的,从接收上报、解析payload、写库、到提交发布事件等,都在同一个模块中,本项目中是拆分的,具体可以参考上一篇“iot-alarm-copilot项目介绍”中的限界上下文划分章节。这里对两者的功能做简单的阐述,具体可阅读代码。
一、access 的职责:把外部协议翻译成内部命令
iot-context-access 是上行链路的后端入口,它面对的是不可信的外部环境,设备厂家、协议、broker等;职责是把任意协议、任意 broker 上报的字节流,翻译成核心域能消费的领域数据。
access 内部分为:
- 协议消费,
interfaces/kafka和interfaces/mqtt两个适配器,前者是正式入口,后者用于本地测试; - topic 解析,
TelemetryTopic值对象负责把iot/{deviceId}/telemetry这种字符串拆出 deviceId; - 接入许可校验,通过端口调 device 上下文,确认这台设备被允许上报;
- payload 解析,根据设备遥测模型把 JSON 解析成结构化字段。
注意 access 自己没有任何主数据,设备模型来自 device 上下文,遥测落库去 telemetry 上下文,它只做”翻译和转发”。
二、telemetry 的职责:把原始字段变成平台标准化指标
telemetry 上下文负责的是把”原始上报字段”变成”平台标准化指标”。
telemetry 的核心抽象是 TelemetrySchema,它定义了平台有哪些指标、单位是什么、是否必填、如何衍生计算。每一条上报进来,telemetry 都按 schema 进行校验、转换、补齐衍生指标,最后落库成一条 TelemetryEvent记录,并对外发布 TelemetryRecordedEvent事件,让下游领域(rule、alarm、ai)订阅。
三、access 和 telemetry 怎么衔接
access 在 ingestTelemetry 的最后一步直接调 telemetry 的应用服务:
1
2
3
4
5
telemetryIngestApplicationService.record(new RecordTelemetryCommand(
telemetryPayload.deviceId(),
telemetryPayload.metrics(),
telemetryPayload.reportedAt(),
telemetryPayload.rawJson()));
telemetry 落库之后通过 ApplicationEventPublisher 发布 TelemetryRecordedEvent,rule 等下游上下文用 @EventListener订阅。这是单体形态下的简化做法,同一进程、同一事务。
未来微服务化以后,这里会被替换成Outbox表事务,并发送Kafka消息;access 和 telemetry 各自独立服务运行即可。
Kafka 死信 topic 和数据库死信聚合
在生产环境中,设备上报的格式可能错、没注册、payload数据被中间链路截断、telemetry 的 schema 可能缺字段等等,每一种失败都会让 access 抛异常,Kafka 消费失败。
最常见的做法是用 Spring Kafka 自带的 DeadLetterPublishingRecoverer重试若干次,如果任然失败,就把消息发到DLT (Kafka中叫Dead Letter Topic,与Queue实际作用基本一致) 。这种方案简单也常用,但这只解决了”消息不丢”,没有解决”失败可观察、可治理”。本项目的做法是:Kafka DLT + 数据库死信聚合两者并存。
一、Kafka DLT 解决”消息不丢”
先看 Kafka 的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public ConsumerRecordRecoverer kafkaAccessDeadLetterRecoverer(
KafkaOperations<Object, Object> kafkaOperations,
KafkaAccessProperties properties,
AccessDeadLetterCaptureApplicationService captureApplicationService,
ObjectMapper objectMapper) {
// 创建死信恢复者
DeadLetterPublishingRecoverer delegate = new DeadLetterPublishingRecoverer(
kafkaOperations,
(record, exception) -> new TopicPartition(properties.resolveDeadLetterTopic(), record.partition()));
// 配置发送行为
delegate.setFailIfSendResultIsError(true); // 发送失败时抛出异常
delegate.setWaitForSendResultTimeout(Duration.ofSeconds(5)); // 等待发送结果超时
// 包装成自定义恢复器(存入数据库)
return new KafkaAccessDeadLetterRecoverer(properties, captureApplicationService, delegate, objectMapper);
}
错误处理器配了指数退避重试 + 不可重试异常列表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 指数退避重试
ExponentialBackOffWithMaxRetries backOff =
new ExponentialBackOffWithMaxRetries(retry.getMaxRetries());
backOff.setInitialInterval(retry.getInitialIntervalMs()); // 初始间隔: 1000ms
backOff.setMultiplier(retry.getMultiplier()); // 退避倍数: 2.0
backOff.setMaxInterval(retry.getMaxIntervalMs()); // 最大间隔: 10000ms
// 不可重试异常列表
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
errorHandler.setCommitRecovered(true);
errorHandler.addNotRetryableExceptions(
BaseDomainException.class, // 领域业务异常(如校验失败)
IllegalArgumentException.class,
ClassCastException.class,
DeserializationException.class);
可查看 KafkaAccessConfiguration.java 源码
几个关键点:
BaseDomainException直接进 DLT,不重试,领域异常一定是数据本身的问题,重试 10 次也是同一个异常,没必要;DeserializationException也不重试,payload 解析失败属于格式错误,重试无意义;- 网络抖动、数据库瞬时不可用等”基础设施异常”会重试,按指数退避策略;
setCommitRecovered(true)保证消息进入 DLT 后 offset 立刻提交,避免重复消费。
这些都是 Spring Kafka 的标准用法,解决消息不丢失问题。
二、Kafka DLT 解决不了的几个问题
失败消息只进DLT,运营上会遇到几个问题:
1. DLT 里的消息没有”业务身份”,DLT 里存的是 ConsumerRecord<byte[], byte[]>,运营看到的是 partition、offset、payload bytes。要回答”今天哪台设备最常失败”等业务问题,得先把 payload 反序列化、再从 MQTT topic 里拆出 deviceId,每查一次都要重新解一次;
2. DLT 不方便交叉查询,业务上常见的问题是”过去 24 小时内 v1.2 固件的某型号设备失败次数”,这种带筛选条件的查询,DLT 做不了,得放到数仓或ES中才可用;
3. DLT 的失败原因丢失了,DLT消息里可以塞 header 带异常类型,但异常的因果链、根因消息、抛出位置这些上下文信息会被压成一行字符串,事后追溯很困难。
DLT 的设计目标是“消息暂存”,无“失败可治理”业务语义。
三、数据库死信聚合解决“失败可治理”
本项目在 access 上下文里专门建了一个领域聚合 AccessDeadLetterLog,每次 Kafka 重试用完后,进入 DLT 时,同时往这张表里写一条记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public record AccessDeadLetterLog(
Long id,
String deadLetterTopic,
String originalTopic,
Integer originalPartition,
Long originalOffset,
String consumerGroup,
String mqttTopic,
String deviceId,
String payload,
String exceptionType,
String exceptionMessage,
Instant failedAt,
Instant createdAt) { ... }
每一个字段都有具体含义:
deviceId解析自 MQTT topic,支持”按设备查失败”;originalPartition+originalOffset+consumerGroup,能精确定位 DLT 里的原始消息,方便回放;mqttTopic,保留原始 topic,遇到 topic 规则演进时也能追溯;exceptionType+exceptionMessage,失败原因结构化,可以做聚合统计;failedAt和createdAt分开,前者是失败发生时间,后者是入库时间,区分清楚便于排查延迟。
实现时用一个自定义 recoverer,来包装 Spring 自带的 DLT recoverer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
captureBestEffort(record, exception); // 先持久化到数据库
delegate.accept(record, exception); // 再发到 Kafka DLT
}
private Throwable mostSpecificCause(Throwable exception) {
Throwable candidate = exception;
while (candidate.getCause() != null && candidate.getCause() != candidate) {
candidate = candidate.getCause();
}
return candidate;
}
private KafkaTelemetryEnvelope resolveEnvelope(Object value) {
... ...
}
可查看 KafkaAccessDeadLetterRecoverer.java 源码
注意几个细节:
captureBestEffort抛异常不会中断 DLT 发送,数据库写失败时记一条 error 日志,但不影响消息继续往 DLT 走;mostSpecificCause取异常的根因,而不是顶层包装异常。这里exceptionMessage才是真正有用的根因信息;resolveEnvelope容错解析,如果 payload 已经被反序列化失败,仍然把能拿到的字段写进去,不让”反序列化失败”导致丢掉死信记录。
四、两层职责的清晰分工
| 职责 | 承担方 |
|---|---|
| 消息不丢、可重放 | Kafka DLT |
| 重试策略、退避、可重试异常分类 | Spring Kafka 的 DefaultErrorHandler |
| 失败的业务身份(deviceId、mqttTopic) | 数据库死信聚合 |
| 失败原因结构化、可聚合统计 | 数据库死信聚合 |
| 运营查询和后台展示 | 数据库死信聚合 |
| 失败回放(取出原消息重发) | Kafka DLT 提供原始字节,数据库聚合提供定位坐标 |
二者并不重复,Kafka DLT是”字节级别的暂存“,数据库死信聚合是”业务级别的台账“。以下是完整的消息处理流程示意图
生产环境中最佳实践
如果项目对死信消息治理要求比较高,如交易场景,可以参考如下,生产级死信处理流程来设计实现(后面可以单独一篇来讲)。
一个容易踩的坑
数据库写死信记录不能放在原事务里
很自然的想法是”消息处理失败时回滚事务,并把死信记录一起提交”,但死信写入必须脱离原事务,否则原事务回滚时死信记录也会被回滚掉。本项目里 AccessDeadLetterCaptureApplicationService 用了 REQUIRES_NEW 隔离,确保它有独立的事务边界。