我正在使用spring-kafka“2.2.7.RELEASE”来创建一个批处理消费者,并且我正在尝试了解当我的记录处理时间超过 max.poll.interval.ms 时消费者重新平衡是如何工作的。
这是我的配置。
public Map<String, Object> myBatchConsumerConfigs() {
Map<String, Object> configs = = new HashMap<>();
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
}
这是我的出厂设置。
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(myBatchConsumerConfigs()));
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
factory.setStatefulRetry(true);
factory.setBatchListener(true);
factory.setBatchErrorHandler(myBatchConsumerSeekToCurrentErrorHandler);
factory.getContainerProperties().setConsumerRebalanceListener(myBatchConsumerAwareRebalanceListener);
factory.setRecoveryCallback(context -> {
logger.logInfo("In recovery call back for KES Batch Consumer", this.getClass());
myBatchConsumerDeadLetterRecoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"), (Exception) context.getLastThrowable());
return null;
});
return factory;
}
我添加了自定义消费者监听器,如下所示。
@Component
public class MyBatchConsumerAwareRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
partitions.forEach(
topicPartition -> {
System.out.println(" onPartitionsRevokedBeforeCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
}
);
//springLogger.logInfo(" onPartitionsRevokedBeforeCommit", getClass());
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
partitions.forEach(
topicPartition -> {
System.out.println(" onPartitionsRevokedAfterCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
}
);
//springLogger.logInfo(" onPartitionsRevokedAfterCommit", getClass());
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
partitions.forEach(
topicPartition -> {
System.out.println(" onPartitionsAssigned - topic - "+ topicPartition.topic() +" partition - "+topicPartition.partition());
}
);
//springLogger.logInfo(" onPartitionsAssigned", getClass());
}
}
Here is my consumer where I've added a delay of 400 sec delay/sleep which is greater than the max.poll.interval.ms ( 300 ms)
@KafkaListener(groupId = "TestBatchConsumers", topics = TEST_KES_BATCH_CONSUMER_TOPIC, containerFactory = "myBatchConsumerContainerFactory")
public void consumeRecords(List<ConsumerRecord<String, Organization>> consumerRecords) {
long startTime = System.currentTimeMillis();
System.out.println("Processing started at "+startTime);
consumerRecords.forEach(consumerRecord -> {
System.out.println(
"Received consumerRecord on topic" + consumerRecord.topic()+" , partition "+consumerRecord.partition()
+ ", at offset " + consumerRecord.offset() + ", with key " + consumerRecord.key() );
try {
Thread.sleep(400000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("Processing completed at "+ System.currentTimeMillis());
long processingTimeInSec = (System.currentTimeMillis() - startTime)/1000 ;
System.out.println(processingTimeInSec);
}
现在我期望消费者群体能够重新平衡,因为处理时间超过 max.poll.interval.ms 但我没有看到任何这样的行为。我在这里错过了什么吗?
请建议。
它的行为符合我的预期:
@SpringBootApplication
public class So67520619Application {
public static void main(String[] args) {
SpringApplication.run(So67520619Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so67520619", "foo");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67520619").partitions(1).replicas(1).build();
}
}
@Component
class Listener implements ConsumerAwareRebalanceListener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "so67520619", topics = "so67520619")
public void listen(List<String> in) throws InterruptedException {
LOG.info(in.toString());
Thread.sleep(12000);
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
LOG.info("Assigned: " + partitions);
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
LOG.info("Revoked: " + partitions);
}
}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
2021-05-13 11:13:16.339 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 11:13:16.358 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo]
2021-05-13 11:13:26.416 INFO 20954 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-c9a440bf-9076-4575-813d-3efb0054f5f7 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 11:13:28.365 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 11:13:28.370 ERROR 20954 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
... 3 common frames omitted
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Revoked: [so67520619-0]
2021-05-13 11:13:28.372 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.376 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 11:13:28.376 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.485 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 3: {consumer-so67520619-1-15ce3150-1aa3-4b43-a892-fbd54e8ed919=Assignment(partitions=[so67520619-0])}
2021-05-13 11:13:28.486 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 3
2021-05-13 11:13:28.487 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 11:13:28.487 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 11:13:28.488 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 11:13:28.489 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 11:13:28.490 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 11:13:28.494 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo]
编辑
在60/65秒的时间里,它仍然对我有效。。。
2021-05-13 17:24:28.111 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 17:24:28.130 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo, foo, foo]
2021-05-13 17:25:28.147 INFO 37063 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-269ac261-3838-4925-a9a7-fd0687db3522 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 17:25:33.135 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 17:25:33.141 ERROR 37063 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
...
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Revoked: [so67520619-0]
2021-05-13 17:25:33.142 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.145 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 17:25:33.145 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.250 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 9: {consumer-so67520619-1-bd22a252-64f2-4be3-a6eb-8371b8f95ff2=Assignment(partitions=[so67520619-0])}
2021-05-13 17:25:33.254 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 9
2021-05-13 17:25:33.254 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 17:25:33.255 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 17:25:33.256 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 17:25:33.258 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 17:25:33.258 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 17:25:33.261 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo, foo, foo]
Kafka在60秒后投诉,5秒后提交失败。
我正在使用karafka阅读主题,并调用外部服务。每次调用外部服务大约需要300毫秒。在消费者组中运行3个消费者(k8s中的3个pod),我预计每秒可以实现10个事件。我看到这些日志线,这也证实了处理每个单独事件的300ms预期。然而,整体吞吐量不相加。每个karafka进程似乎在处理两批事件之间卡住了很长时间。 遵循 方法的检测,意味着使用者代码本身不会花费时间。 https://github
假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?
我尝试在使用邮件时进行以下错误处理: 如果出现序列化错误:在DLT中发送消息 我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下: 现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT- 在我的中,我有一个,捕获并重新捕获。 我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。 如果我删除errorH
我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。
想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这
我有代码: 处理消费者异常的最佳方法是什么?现在,如果异常引发,它将被吞没。。。