我正在使用具有非阻塞重试的SpringKafka2.8.0。我使用的是具有固定回退和死信主题的单个重试主题。这是我的配置:
@Bean
public RetryTopicConfiguration receiptRetryConfiguration(
KafkaTemplate<String, byte[]> kafkaTemplate,
ConcurrentKafkaListenerContainerFactory<String, byte[]> retryListenerContainerFactory
) {
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(3)
.fixedBackOff(5000)
.setTopicSuffixingStrategy(SUFFIX_WITH_INDEX_VALUE)
.useSingleTopicForFixedDelays()
.doNotAutoCreateRetryTopics()
.dltHandlerMethod(
new EndpointHandlerMethod(ConsumerErrorsHandler.class, "handleError")
)
.notRetryOn(List.of(SerializationException.class, EmptyMessageValueException.class))
.listenerFactory(retryListenerContainerFactory)
.create(kafkaTemplate);
}
我正在尝试实现失败标头管理,并将自定义error_code
标头添加到发送到 DLT 的邮件中:
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.setHeadersFunction((record, exception) ->
new RecordHeaders().add("error_code", getErrorCode(exception).getBytes(StandardCharsets.UTF_8))
);
});
return factory;
}
它可以工作,但调用dlpr.setHeadersFunction(…)
将禁用添加重试标头的标准机制:retry_topic-timests
、
、
retry_topic-original-timstamp
除了
retry_topic-*
标头之外,还有什么方法可以将自定义标头添加到发送到DLT的消息中吗?
是否可以将
死信发布恢复器工厂
配置为仅针对发送到 DLT 主题的消息,而不是为重试主题配置?
请打开新功能问题;我认为我们需要添加一个<code>getHeadersFunction()</code>,这样如果复合函数已经存在,就可以构建它(或者我们应该添加一个
方法)。<="">
https://github.com/spring-projects/spring-kafka/issues
现在,您必须使用反射(例如直接字段访问器
)来获取对现有函数(标头函数
字段)的引用。
我是springboot kafka的新手,我在这篇文章后面创建了一个例子。 https://www.codenotfound.com/spring-kafka-boot-example.html 我发现您可以将kafka元数据设置为标题,但这不符合我的目的。 我能做到这一点吗?如果可能的话,我很感激你能分享一个例子。
问题内容: 我正在编写一个服务器/客户端程序,客户端将文本消息发送到服务器。我使用了非阻塞I / O(NIO API),但是服务器上的消息无法正确显示。这是我在服务器上的代码: 这是一段客户代码: 在运行时,当客户端向服务器发送消息时,将显示空格字符或一条消息。 问题答案: 您需要在之前和之后。 注意在返回-1时循环并没有任何意义。为了天堂,这意味着同伴断开连接。
/** 自定义消息发送接口(目前只支持文本消息发送) @param chatController 聊窗实体(必须是小能SDK创建的聊窗实体NtalkerChatController类型) @param type 消息类型: 11:文本消息 12:图片消息 13:语音消息 14:视频消息 @param message 消息内容 */ NSString *str = @“自定义文本消息test”;
问题内容: 我们有一个测试套件,主要使用带有Hamcrest匹配器的JUnit断言。我们的一个团队开始对AssertJ进行实验,并以其语法,灵活性和声明性给人留下了深刻的印象。JUnit提供的一项功能是我无法在AssertJ中找到与之等效的功能:添加自定义断言失败消息。 我们经常在比较不是为了人类可读性而制成的对象,这些对象将具有随机查找的Id或UUID,并且无法通过包含的数据来判断它们应该是什么
我们经常比较那些不是为了人类可读性而制作的对象,这些对象将具有随机的ID或UUID,并且不可能根据它们包含的数据来判断它们应该是什么。对于我们的代码库来说,这是一个不可避免的情况,可悲的是,它实现的目的之一是在其他服务之间映射数据,而不需要理解它是什么。 在JUnit中,方法提供的版本在参数之前带有参数。这使得添加一个简短的调试字符串变得微不足道,从而揭示了一些问题,比如比较对人类应该意味着什么。
我是Perl 6的新手。我的Atom编辑器中有以下代码,但我仍然不明白这是如何工作的。正如docs.raku.org所说,我复制了以下代码,但似乎不起作用。所以我将代码更改为: 输出为: 我知道它是有效的,但是docs.raku.org中的文档与此有点不同,它在第7行的AT-KEY方法之前没有“自”。有没有更详细的例子?