我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导)
问题是:即使我注释掉ack.acknowledge(),偏移量仍然在更新,消费者正在从下一个偏移量中读取,这是出乎意料的(到目前为止,我的理解是)
消费者配置[请注意,我将ConsumerConfig.ENABLE_AUTO_COMMIT_Config设置为false,并将factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private AdapterStreamProperties appProperties;
@Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
private String receiveBufferBytes;
@Bean
public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"adapter");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
然后我的消费者以这种方式消费[即使我注释掉ack.acknowledge ()' , 下次它仍然从下一个偏移读取:
@KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
System.out.println("----------------------------------------------------------------------------");
System.out.println("Reading from Offset: " + offset + ", and Partition: " + partition);
System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " + record.value());
System.out.println("----------------------------------------------------------------------------");
NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());
if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
}
else {
kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
}
ack.acknowledge();
}
我的gradle的Kafkaapi版本。建造:
//Kafka Dependencie
implementation 'org.apache.kafka:kafka-streams:2.0.1'
implementation 'org.apache.kafka:kafka-clients:2.0.1'
任何洞察力都会有所帮助。
提前谢谢
ack.acknowledge()
仅表示您已提交,您的消费者已成功处理该消息。
这将更新消费者组偏移量,如果应用程序在没有ack的情况下停止,则当前偏移量将不会提交给kafka(您已处理但未确认的最后一条消息)。
具有相同用户组(和相同分配分区)的新用户将再次使用相同的消息,因为应用程序将从用户组偏移量读取起始点。
如果你ack或don't ack没有影响您的当前侦听器,它将简单地继续处理新消息(只要没有错误发生,我想重新平衡也可能是偏移重载的来源(不完全确定))
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。
谢了。
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移