当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka SeekToCurrentErrorHandler查找哪个记录失败

鲜于喜
2023-03-14

我已经用kafkahandler实现了一个Kafka消费者。我的使用者应该使用事件,然后为每个事件向其他服务发送REST请求。只有当REST服务关闭时,我才想重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

我使用ExceptionClassifierRetryPolicy来设置异常和相应的重试策略。

如果我有机会了解SeekToCurrEnterrorHandler中的哪个记录失败了,那么我将创建SeekToCurrEnterrorHandler的自定义实现,以检查失败的消息是否可重试(通过使用ThrownException字段)。如果它是不可重试的,那么我将从记录列表中删除它以重新查找。

关于如何实现这个功能有什么想法吗?

注意:enable.auto.commit设置为falseauto.offset.reset设置为最早

谢谢你!

共有1个答案

金皓君
2023-03-14

Apache Kafka2.2(尚未发布)的FailedRecordTracker:

https://docs.spring.io/spring-kafka/docs/2.2.0.m2/reference/html/whats-new-part.html#_listener_container_changes

从2.2版开始,SeekToCurrEnterrorHandler现在可以恢复(跳过)一个持续失败的记录。默认情况下,在10次失败后,将记录失败的记录(错误)。您可以使用自定义恢复器(biconsumer)和/或最大故障来配置处理程序。

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);

因此,您所需要的只是将FailedRecordTrackerSeekToCurrEnterRorHandler源代码从复制/粘贴到项目中,您就会拥有您正在寻找的功能:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/failedrecordtracker.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/seektocurrenterrorhandler.java

 类似资料:
  • find 查找匹配标准的记录,可以链式查询(见下文): Person.find({status:'active'}, function(err, results) { // ... }); 你也可以限制结果的个数,这条语句限制结果为10个: Person.find({status:'active'}, 10, function(err, results) { // ... }); Pers

  • 这里有一段代码演示了一个案例,其中用于处理DNS的Java标准扩展无法正确查找特定域的MX记录,而实用程序这样做没有问题。我已经在不同的机器、网络和操作系统(AWS Ubuntu、Linode CentOS、本地Windows)上复制了这一点。 同时,这是: 可能是什么问题?我对可靠地从 Java 中查找 MX 记录感兴趣。 环境:

  • 在互联网上研究了这个话题后,我找不到满意的答案,想把这个例子展示给SO社区,看看这是与我们的环境或数据相关的单个一次性案例,还是与Spark相关的更大模式。 Spark 1.6.1 Spark conf:执行器2个,执行存储器32G,执行器核心4个。 数据在一个内部配置单元表中,该表存储为ORC文件类型,用zlib压缩。压缩文件的总大小为2.2GB。

  • 在下面的java代码中,我正在进行DNS SRV记录查找,以解析给定域名的目标域名和相关端口,例如root@1000000000.blubluzone.com.用/HERE/below表示的查找函数以某种方式返回null,我无法获得查询结果(即记录数组为null)。因此,当在for循环中访问记录数组时,将引发空指针异常。 你认为我缺少什么来让下面的代码工作。我正在使用dnsjava,相关的API

  • 我有下表: 我还有以下表: 现在我有以下疑问: 长话短说--我的目标是,给定值,从表中获取最新的值。 它应该是这样工作的:给定值-在表中搜索与相同值的匹配记录(注意此列是唯一的)。然后-在表中查找最新的值(使用值)。使用与表中的列匹配的列在此表中搜索它。 当前,该查询将返回,但不返回(我需要它是3030)。你能帮我修一下吗?

  • 问题内容: 我想在MySQL数据库中提取重复记录。这可以通过以下方式完成: 结果是: 我想将其拉出,以使其显示重复的每一行。就像是: 关于如何做到这一点有什么想法?我试图避免做第一个,然后在代码中用第二个查询查找重复项。 问题答案: 关键是重写此查询,以便可以将其用作子查询。