当我更改DeadLetterPublishingRecoverer destionationResolver时,我发现了一个错误。
当我使用:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());
它工作完美。
但是,如果使用\u ERR而不是。错误,出现错误:
2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}
2020-08-05 12:53:10,278[kafka-生产者-网络-线程|生产者-kafka-tx-group1.ABC_TEST_XPTO.0]ERRORorg.apache.kafka.clients.元数据-[生产者clientId=生产者-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0]元数据响应报告无效主题[ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,309[org.springframework.kafka.KafkaListenerEndpoint Container#0-0-C-1]ERROR o. s. k. s. LoggingProducerListener-发送消息时抛出的异常key='null'和payload='XPTOEvent(Super=Event(id=CAPBA2548,目标=ABC_TEST_XPTO, he...'到主题ABC_TEST_XPTO_ERR和分区0:org.apache.kafka.common.errors.InvalidTopicException:无效主题:[ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,320[org.springframework.kafka.KafkaListenerEndpoint Container#0-C-1]ERROR o. s. k. l.DeadLetterPublishingRecoverer-死信发布失败:生产者记录(主题=ABC_TEST_XPTO_ERR,分区=0,标题=记录标题(标题=...org.springframework.kafka.KafkaException:发送失败;嵌套异常org.apache.kafka.common.errors.无效主题异常:无效主题:[ABC_TEST_XPTO_ERR]在org.springframework.kafka.core.KafkaTemplate.do发送(KafkaTemplate.java:573)在org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)在org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290)在org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226)在org.springframework.kafka.listener.DeadLetterPublishingR(DeadLetterPublishingRecoverer.java:54)在org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)强文本
“我的主题”在名称中间使用了“\uu”,例如ABC\u TEST\u XPTO,因此如果可能的话,我希望将死信主题设置为“ERR”
我的环境
>
Spring Boot 2.3.2。发布
Spring-Kafka 2.5.3。释放,但2.5.4出现同样的问题。释放
Java11
私有静态最终BiFunction
@组件类ContainerFactoryConfigurer{
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm,
KafkaTemplate<Object, Object> template) {
factory.getContainerProperties().setTransactionManager(tm);
DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {
}, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true);
factory.setAfterRollbackProcessor(rollbackProcessor);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps)));
errorHandler.setCommitRecovered(true);
errorHandler.setAckAfterHandle(true);
factory.setErrorHandler(errorHandler);
}
}
谢谢DPG
这对我来说很好...
@SpringBootApplication
public class So63270367Application {
public static void main(String[] args) {
SpringApplication.run(So63270367Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
}
@KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO_ERR")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
也许你的经纪人对主题名称有一些规定;也许看看经纪人的日志?
编辑
正如我在评论中所说;记录从哪里发布并不重要;这对我仍然有效。。。
@SpringBootApplication
public class So63270367Application {
public static void main(String[] args) {
SpringApplication.run(So63270367Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("ABC_TEST_XPTO").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicErr() {
return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
}
@KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@KafkaListener(id = "so63270367err", topics = "ABC_TEST_XPTO_ERR")
public void listenErr(String in) {
System.out.println("From DLT:" + in);
}
@Bean
public SeekToCurrentErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
template,
(cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition())),
new FixedBackOff(0L, 0L));
}
}
From DLT:foo
简单问题: 假设我有一个具有3个分区的主题:Topic:StateEvents P1、P2和P3。 让我们假设生产者生成20条消息: 1, 2, 3, ..........20 我的问题是: 当制作人生成这些消息时: 1)每个消息将只在且仅在1个分区?也就是说,1在P1,2在P2,3在P3,然后4在P1,5在P2,6在P3,以此类推? 2)如果#1为真,当消费者订阅时,它将订阅所有分区,以便获得所
通过Kafka文档和各种其他资源,我了解到Kafka中的消息被组织成主题。此外,主题可以分解为多个分区,每个分区可以托管在不同的服务器上。这提供了冗余和可伸缩性。 我不确定这里的“破碎”这个词是什么意思。这是否意味着,如果添加到主题的消息是,例如“1 2 3 4 5 6 7”,那么在将其分解为分区后,我们将有一个分区仅包含整个主题的子部分。就像一个分区有“1 2 3”,而另一个分区有“4 5 6”
我在Android开发方面相当陌生,并且一直在努力为我正在开发的应用程序进行工作构建。 使用 Gradle 进行构建后,我收到以下错误: :app:transformClassesWithJarMergingForDebug失败 失败:生成失败,出现异常。 > < li >哪里出错:任务执行失败”:app:transformClassesWithJarMergingForDebug。 < bloc
我非常需要帮助,但在网上找不到关于这个特定主题的任何东西(许多相关的问题没有得到回答)。 具体来说,我需要能够从中央和外部代码存储库下载代码(JAR)。这是由引导代码完成的,引导代码需要将其添加到类装入器的类路径中,以便以后使用。这就是我们进入这个已经讨论了很多次的话题的时候。我不喜欢黑客,所以我尝试了以下方法: 尝试#1:创建一个为此目的配置的URLClassLoader实例,然后通过它调用代码
我看到并实现了KafkaTemplate方法。使用默认分区器类发送(主题、消息)。 但在这里,我不是在传递钥匙。我有一个简单的自定义分区器类,我还想发送到kafka服务器,比如KafkaTemplate(主题、键、消息),在producerConfig中,我为分区设置了customPartitioner类。 如果我提供自定义分区器,我看到KafkaTemboard的这个Will send(Topi
结果,更改分区数的命令运行良好。但是,我不知道上面的错误是什么样的。 (供参考,共有10家券商,500多个主题。)