当前位置: 首页 > 知识库问答 >
问题:

DLT消息消耗

郑博厚
2023-03-14

我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。

有没有简单的方法来进行设置?

因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。

// our configuration

 @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Object> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }


@Bean
  public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {

    List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
        IllegalAccessException.class);

    return RetryTopicConfigurationBuilder
        .newInstance()
        .dltHandlerMethod(XYZ.class, "xyz")
        .exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
        .maxAttempts(retryAttempt)
        .notRetryOn(throwableList)
        .doNotAutoCreateRetryTopics()
        .listenerFactory(kafkaListenerContainerFactory())
        .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
        .create(template);
  }

共有1个答案

郝玄天
2023-03-14

只需删除。dltHandlerMethod(XYZ.class,“XYZ”)

编辑

这仍然会创建一个默认的DLT处理程序,它只会记录记录。

您可以在“其他”使用者中使用其他组,也可以手动启动容器(DLT容器除外)。

在KafkaListener上设置自动启动;然后添加此。。。

@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
    return args -> {
        registry.getListenerContainerIds().forEach(id -> {
            if (!id.endsWith("-dlt")) {
                registry.getListenerContainer(id).start();
            }
        });
    };
}
 类似资料:
  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 消息组件为你的App提供了可视评论和消息系统。 消息组件布局 ... <div class="page"> <div class="page-content messages-content"> <div class="messages"> <!-- 时间戳 --> <div class="messages-date">Sunday, Feb 9 <spa

  • Unread message count(未读消息统计) GET /user/counts 响应: Status: 200 OK { "user": { // 用户相关 "following": 1, // 用户关注者新增(粉丝新增)数量 "liked": 0, // 被点赞数 "commented": 0, // 被评论数 "system": 0,

  • 公众号消息分为 服务端被动回复消息 和 客服消息 两个场景。 需要注意的是两个场景的消息虽然类似,但是结构却有些差异,比如服务端使用 XML 结构,而客服消息使用 JSON 结构,且同样类似的消息类型,结构和名称都有些许差异,在使用时请勿混淆。 服务端消息结构 当你接收到用户发来的消息时,可能会提取消息中的相关属性,参考: 请求消息基本属性(以下所有消息都有的基本属性): - `ToUserN

  • 我把微信的 API 里的所有“消息”都按类型抽象出来了,也就是说,你不用区分它是回复消息还是主动推送消息,免去了你去手动拼装微信的 XML 以及乱七八糟命名不统一的 JSON 了。 在阅读以下内容时请忽略是 接收消息 还是 回复消息,后面我会给你讲它们的区别。 消息类型 消息分为以下几种:文本、图片、视频、声音、链接、坐标、图文、文章 和一种特殊的 原始消息。 另外还有一种特殊的消息类型:素材消息

  • 注:内容翻译自官网文档 Messages 给出一个简单的消息定义: message Foo {} protocol buffer 编译器生成名为 Foo 的类,实现 Message 接口。这个类被定义为 final, 不容许任何子类。Foo 继承自 GeneratedMessage, 但是这个可以认为是实现细节。默认, Foo 用为实现最大速度的特别版本来覆盖很多 GeneratedMessag