我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个消费者中,我只是增加消息计数器并提交偏移量。这个消费者没有信息丢失。
try {
//kafkaConsumer.registerTopic();
consumerThread = new Thread(() -> {
final String topicName1 = "topic-0";
final String topicName2 = "topic-1";
final String topicName3 = "topic-2";
final String topicName4 = "topic-3";
String groupId = "group-0";
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.49:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
try {
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topicName1, topicName2, topicName3, topicName4));
} catch (KafkaException ke) {
logTrace(MODULE, ke);
}
while (service.isServiceStateRunning()) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, byte[]>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, byte[]> record : partitionRecords) {
processMessage(simpleMessage);
}
}
consumer.commitSync();
}
kafkaConsumer.closeResource();
}, "KAKFA_CONSUMER");
} catch (Exception e) {
}
这里的subscribe()用法似乎有问题。
Subscribe用于订阅主题而不是分区。要使用特定的分区,需要使用assign()。阅读文档中的摘录:
public void subscribe(java.util.Collection主题)
手动将分区列表分配给此使用者。此接口不允许增量赋值,并将替换先前的赋值(如果有的话)。如果给定的主题分区列表为空,则与unsubscribe()处理相同。通过此方法进行的手动主题分配不使用使用者的组管理功能。因此,当组成员资格或集群和主题元数据改变时,将不会触发再平衡操作。注意,不能同时使用assign(Collection)的手动分区分配和subscribe(Collection,ConsumerRebalanceListener)的组分配。
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow
谢了。
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1