我有一个 kafka 消费者类,它有一个主主题侦听器和一个 DLQ 侦听器。当主主题监听器无法处理消费者记录时,根据我的 bean 工厂,记录被推送到 DLQ 主题中。因此,DLQ 成功处理了该消息。但是,当我重新启动使用者应用程序时,我看到 DLQ 处理的消息再次被主主题侦听器使用,尽管它已成功处理。有人可以帮助我如何防止主要主题重新使用DLQ处理的消息吗?提前感谢您!
Kafka·Consumer.java
public class KafkaConsumer {
//MAIN TOPIC LISTENER
@KafkaListener(id = "main-topic", topics = "main-topic", groupId = "main", containerFactory = "kafkaListenerContainerFactory", clientIdPrefix = "main-topic")
public void mainListener(ConsumerRecord<String, String> consumerRecord, Acknowledgement ack) {
//The below line of code converts the consumerRecord value into an Object class and save the converted object to DB.
dbService.saveTodb(consumerRecord.value(), new ObjectMapper());
ack.acknowledge();
}
//DLQ LISTENER
@KafkaListener(id = "DLQ-topic", topics = "DLQ-topic", groupId = "main", clientIdPrefix = "DLQ", autostartup= "false")
public void mainListener(ConsumerRecord<String, String> consumerRecord, Acknowledgement ack) {
dbService.saveTodb(consumerRecord.value(), new ObjectMapper());
ack.acknowledge();
}
}
KafkaBeanFactory.java
public class KafkaBeanFactory{
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
var recoverer = new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition("DLQ-topic", record.partition()));
var errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(3, 20000));
errorHandler.addRetryableExceptions(JsonProcessingException.class, DBException.class);
errorHandler.setAckAfterHandle(true);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
应用程序.yaml
kafka:
bootstrap-servers: localhost:9092 # sample value
client-id: main&DLQ
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<string>";
security:
protocol: SASL_SSL
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
groupId: main
enable-auto-commit: false
auto.offset.reset: earliest
listener:
ack-mode: MANUAL_IMMEDIATE
您需要在默认错误处理程序上设置< code>commitRecovered属性。
/**
* Set to true to commit the offset for a recovered record.
* The container must be configured with
* {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
* Whether or not the commit is sync or async depends on the container's syncCommits
* property.
* @param commitRecovered true to commit.
*/
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正
我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:
假设我有2个Kafka主题登录和注销按用户名分区,并具有相等数量的分区。如果我运行一个由两个消费者组成的消费者组,同时使用两个主题,我是否可以确定每个用户的登录和注销事件将由同一个消费者处理?
我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
我正在构建一个由亚马逊服务提供支持的警报系统。 我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。 Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。 我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。 发电机代码: 以