当前位置: 首页 > 知识库问答 >
问题:

如何在使用Spring Kafka非阻塞重试向DLT发送消息时添加自定义标头?

湛文乐
2023-03-14

我正在使用具有非阻塞重试的SpringKafka2.8.0。我使用的是具有固定回退和死信主题的单个重试主题。这是我的配置:

@Bean
public RetryTopicConfiguration receiptRetryConfiguration(
        KafkaTemplate<String, byte[]> kafkaTemplate,
        ConcurrentKafkaListenerContainerFactory<String, byte[]> retryListenerContainerFactory
) {
    return RetryTopicConfigurationBuilder.newInstance()
            .maxAttempts(3)
            .fixedBackOff(5000)
            .setTopicSuffixingStrategy(SUFFIX_WITH_INDEX_VALUE)
            .useSingleTopicForFixedDelays()
            .doNotAutoCreateRetryTopics()
            .dltHandlerMethod(
                    new EndpointHandlerMethod(ConsumerErrorsHandler.class, "handleError")
            )
            .notRetryOn(List.of(SerializationException.class, EmptyMessageValueException.class))
            .listenerFactory(retryListenerContainerFactory)
            .create(kafkaTemplate);
}

我正在尝试实现失败标头管理,并将自定义error_code标头添加到发送到 DLT 的邮件中:

@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
    DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
    factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
        dlpr.setHeadersFunction((record, exception) ->
                new RecordHeaders().add("error_code", getErrorCode(exception).getBytes(StandardCharsets.UTF_8))
        );
    });
    return factory;
}

它可以工作,但调用dlpr.setHeadersFunction(…)将禁用添加重试标头的标准机制:retry_topic-timestsretry_topic-original-timstamp

除了retry_topic-*标头之外,还有什么方法可以将自定义标头添加到发送到DLT的消息中吗?

是否可以将死信发布恢复器工厂配置为仅针对发送到 DLT 主题的消息,而不是为重试主题配置?


共有1个答案

许华清
2023-03-14

请打开新功能问题;我认为我们需要添加一个<code>getHeadersFunction()</code>,这样如果复合函数已经存在,就可以构建它(或者我们应该添加一个 方法)。<="">

https://github.com/spring-projects/spring-kafka/issues

现在,您必须使用反射(例如直接字段访问器)来获取对现有函数(标头函数字段)的引用。

 类似资料:
  • 我是springboot kafka的新手,我在这篇文章后面创建了一个例子。 https://www.codenotfound.com/spring-kafka-boot-example.html 我发现您可以将kafka元数据设置为标题,但这不符合我的目的。 我能做到这一点吗?如果可能的话,我很感激你能分享一个例子。

  • 问题内容: 我正在编写一个服务器/客户端程序,客户端将文本消息发送到服务器。我使用了非阻塞I / O(NIO API),但是服务器上的消息无法正确显示。这是我在服务器上的代码: 这是一段客户代码: 在运行时,当客户端向服务器发送消息时,将显示空格字符或一条消息。 问题答案: 您需要在之前和之后。 注意在返回-1时循环并没有任何意义。为了天堂,这意味着同伴断开连接。

  • /** 自定义消息发送接口(目前只支持文本消息发送) @param chatController 聊窗实体(必须是小能SDK创建的聊窗实体NtalkerChatController类型) @param type 消息类型: 11:文本消息 12:图片消息 13:语音消息 14:视频消息 @param message 消息内容 */ NSString *str = @“自定义文本消息test”;

  • 我是Perl 6的新手。我的Atom编辑器中有以下代码,但我仍然不明白这是如何工作的。正如docs.raku.org所说,我复制了以下代码,但似乎不起作用。所以我将代码更改为: 输出为: 我知道它是有效的,但是docs.raku.org中的文档与此有点不同,它在第7行的AT-KEY方法之前没有“自”。有没有更详细的例子?

  • 问题内容: 我们有一个测试套件,主要使用带有Hamcrest匹配器的JUnit断言。我们的一个团队开始对AssertJ进行实验,并以其语法,灵活性和声明性给人留下了深刻的印象。JUnit提供的一项功能是我无法在AssertJ中找到与之等效的功能:添加自定义断言失败消息。 我们经常在比较不是为了人类可读性而制成的对象,这些对象将具有随机查找的Id或UUID,并且无法通过包含的数据来判断它们应该是什么

  • 我们经常比较那些不是为了人类可读性而制作的对象,这些对象将具有随机的ID或UUID,并且不可能根据它们包含的数据来判断它们应该是什么。对于我们的代码库来说,这是一个不可避免的情况,可悲的是,它实现的目的之一是在其他服务之间映射数据,而不需要理解它是什么。 在JUnit中,方法提供的版本在参数之前带有参数。这使得添加一个简短的调试字符串变得微不足道,从而揭示了一些问题,比如比较对人类应该意味着什么。