我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。
有没有简单的方法来进行设置?
因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。
// our configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
IllegalAccessException.class);
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod(XYZ.class, "xyz")
.exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
.maxAttempts(retryAttempt)
.notRetryOn(throwableList)
.doNotAutoCreateRetryTopics()
.listenerFactory(kafkaListenerContainerFactory())
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}
只需删除。dltHandlerMethod(XYZ.class,“XYZ”)
。
编辑
这仍然会创建一个默认的DLT处理程序,它只会记录记录。
您可以在“其他”使用者中使用其他组,也可以手动启动容器(DLT容器除外)。
在KafkaListener上设置自动启动;然后添加此。。。
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
registry.getListenerContainerIds().forEach(id -> {
if (!id.endsWith("-dlt")) {
registry.getListenerContainer(id).start();
}
});
};
}
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
消息组件为你的App提供了可视评论和消息系统。 消息组件布局 ... <div class="page"> <div class="page-content messages-content"> <div class="messages"> <!-- 时间戳 --> <div class="messages-date">Sunday, Feb 9 <spa
Unread message count(未读消息统计) GET /user/counts 响应: Status: 200 OK { "user": { // 用户相关 "following": 1, // 用户关注者新增(粉丝新增)数量 "liked": 0, // 被点赞数 "commented": 0, // 被评论数 "system": 0,
公众号消息分为 服务端被动回复消息 和 客服消息 两个场景。 需要注意的是两个场景的消息虽然类似,但是结构却有些差异,比如服务端使用 XML 结构,而客服消息使用 JSON 结构,且同样类似的消息类型,结构和名称都有些许差异,在使用时请勿混淆。 服务端消息结构 当你接收到用户发来的消息时,可能会提取消息中的相关属性,参考: 请求消息基本属性(以下所有消息都有的基本属性): - `ToUserN
我把微信的 API 里的所有“消息”都按类型抽象出来了,也就是说,你不用区分它是回复消息还是主动推送消息,免去了你去手动拼装微信的 XML 以及乱七八糟命名不统一的 JSON 了。 在阅读以下内容时请忽略是 接收消息 还是 回复消息,后面我会给你讲它们的区别。 消息类型 消息分为以下几种:文本、图片、视频、声音、链接、坐标、图文、文章 和一种特殊的 原始消息。 另外还有一种特殊的消息类型:素材消息
注:内容翻译自官网文档 Messages 给出一个简单的消息定义: message Foo {} protocol buffer 编译器生成名为 Foo 的类,实现 Message 接口。这个类被定义为 final, 不容许任何子类。Foo 继承自 GeneratedMessage, 但是这个可以认为是实现细节。默认, Foo 用为实现最大速度的特别版本来覆盖很多 GeneratedMessag