我已经用kafkahandler
实现了一个Kafka消费者。我的使用者应该使用事件,然后为每个事件向其他服务发送REST请求。只有当REST服务关闭时,我才想重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
我使用ExceptionClassifierRetryPolicy
来设置异常和相应的重试策略。
如果我有机会了解SeekToCurrEnterrorHandler
中的哪个记录失败了,那么我将创建SeekToCurrEnterrorHandler
的自定义实现,以检查失败的消息是否可重试(通过使用ThrownException
字段)。如果它是不可重试的,那么我将从记录
列表中删除它以重新查找。
关于如何实现这个功能有什么想法吗?
注意:enable.auto.commit
设置为false
,auto.offset.reset
设置为最早
。
谢谢你!
Apache Kafka2.2
(尚未发布)的FailedRecordTracker
:
https://docs.spring.io/spring-kafka/docs/2.2.0.m2/reference/html/whats-new-part.html#_listener_container_changes
从2.2版开始,SeekToCurrEnterrorHandler
现在可以恢复(跳过)一个持续失败的记录。默认情况下,在10次失败后,将记录失败的记录(错误)。您可以使用自定义恢复器(biconsumer
)和/或最大故障来配置处理程序。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures - e.g. send to a dead-letter topic
}, 3);
因此,您所需要的只是将FailedRecordTracker
和SeekToCurrEnterRorHandler
源代码从主
复制/粘贴到项目中,您就会拥有您正在寻找的功能:
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/failedrecordtracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/seektocurrenterrorhandler.java
find 查找匹配标准的记录,可以链式查询(见下文): Person.find({status:'active'}, function(err, results) { // ... }); 你也可以限制结果的个数,这条语句限制结果为10个: Person.find({status:'active'}, 10, function(err, results) { // ... }); Pers
这里有一段代码演示了一个案例,其中用于处理DNS的Java标准扩展无法正确查找特定域的MX记录,而实用程序这样做没有问题。我已经在不同的机器、网络和操作系统(AWS Ubuntu、Linode CentOS、本地Windows)上复制了这一点。 同时,这是: 可能是什么问题?我对可靠地从 Java 中查找 MX 记录感兴趣。 环境:
在互联网上研究了这个话题后,我找不到满意的答案,想把这个例子展示给SO社区,看看这是与我们的环境或数据相关的单个一次性案例,还是与Spark相关的更大模式。 Spark 1.6.1 Spark conf:执行器2个,执行存储器32G,执行器核心4个。 数据在一个内部配置单元表中,该表存储为ORC文件类型,用zlib压缩。压缩文件的总大小为2.2GB。
在下面的java代码中,我正在进行DNS SRV记录查找,以解析给定域名的目标域名和相关端口,例如root@1000000000.blubluzone.com.用/HERE/below表示的查找函数以某种方式返回null,我无法获得查询结果(即记录数组为null)。因此,当在for循环中访问记录数组时,将引发空指针异常。 你认为我缺少什么来让下面的代码工作。我正在使用dnsjava,相关的API
我有下表: 我还有以下表: 现在我有以下疑问: 长话短说--我的目标是,给定值,从表中获取最新的值。 它应该是这样工作的:给定值-在表中搜索与相同值的匹配记录(注意此列是唯一的)。然后-在表中查找最新的值(使用值)。使用与表中的列匹配的列在此表中搜索它。 当前,该查询将返回,但不返回(我需要它是3030)。你能帮我修一下吗?
问题内容: 我想在MySQL数据库中提取重复记录。这可以通过以下方式完成: 结果是: 我想将其拉出,以使其显示重复的每一行。就像是: 关于如何做到这一点有什么想法?我试图避免做第一个,然后在代码中用第二个查询查找重复项。 问题答案: 关键是重写此查询,以便可以将其用作子查询。