对于我的测试,我在队列中发布了700万条消息。我创建了一个包含30个消费者线程消费者组,每个分区一个。我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。不幸的是,情况并非如此。在我的例子中,数据处理是复杂的,平均需要1-2分钟才能完成,这导致了一系列分区重新平衡,因为线程不能按时运行。我在日志里看到一堆消息
组FULL_GROUP的自动偏移量提交失败:无法完成提交,因为该组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。
这会导致同一消息被多次处理。我尝试使用会话超时、max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理BigTime的速度。下面是一些配置参数。
metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
这是我的消费者代码
while (true) {
try{
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
// Process record
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
}
}
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
我真的在寻找一些建议/最佳实践在这种情况下。特别是关于心跳、请求超时、最大轮询记录、自动提交间隔、轮询间隔等的推荐配置设置。如果kafka不适合我的用例,请让我知道。
您可以从异步处理消息开始,在一个独立的线程中,而不是从Kafka读取消息的线程。这样自动提交将非常快,Kafaka不会削减您的会话。类似这样的事情:
private final BlockingQueue<TextAnalysisRequest> requests =
new LinkedBlockingQueue();
在读取线程中:
while (true) {
try{
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
// Process record
requests.offer(textAnalysisObj);
}
}
}
}
catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
在处理线程中:
while (!Thread.currentThread().isInterrupted()) {
try {
TextAnalysisRequest textAnalysisObj = requests.take();
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
} catch (InterruptedException e) {
LOGGER.info("Process thread interrupted", e);
Thread.currentThread().interrupt();
} catch (Throwable t) {
LOGGER.warn("Unexpected throwable while processing.", t);
}
}
我有两个Kafka监听器组件,每个组件监听不同的主题并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端id吗?还是必须使用不同的客户端id?如果客户端id必须不同,我想了解一个可以有效使用客户端id的用例。
我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我有一个Spring-boot应用程序,可以听Kafka。为了避免重复处理,我尝试手动提交。为此,我在阅读主题后异步提交了一条消息。但是我被困在如何实现消费者幂等,这样记录就不会被处理两次。