我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?
多亏了阿列克谢(我也投了赞成票),我似乎基本上按照同样的想法解决了我的问题。
只是想分享我的经验……在我们的案例中,我们根据要求使用Kafka。
我尝试了…KafkaConsumer。分配()
重复(使用Thread.sleep(100)
),但似乎没有帮助。添加KafkaConsumer。投票(50)
似乎已经为消费者(群体)做好了准备,并且也收到了第一个响应。测试了几次,现在它一直在工作。
顺便说一句,测试需要停止应用程序
PS:只需调用poll(50)
不带赋值()
获取逻辑可能无法保证消费者(组)已经准备好了。
您可以执行以下操作:
我有一个从kafka主题读取数据的测试
所以您不能在多线程环境中使用KafkaConsumer,但您可以传递参数“AtomicReference assignment”,在使用者线程中更新它,然后在另一个线程中读取它例如,项目中的工作代码被截取以进行测试:
private void readAvro(String readFromKafka,
AtomicBoolean needStop,
List<Event> events,
String bootstrapServers,
int readTimeout) {
// print the topic name
AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();
long startTime = System.currentTimeMillis();
long maxWaitingTime = 30_000;
for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
+ assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
if (assignments.size() > 0) {
break;
}
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
needStop.set(true);
break;
}
}
System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}
private void readAvro(String bootstrapServers,
String readFromKafka,
AtomicBoolean needStop,
List<Event> events,
int readTimeout,
AtomicReference<Set<TopicPartition>> assignment) {
KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
System.out.println("Subscribed to topic: " + readFromKafka);
consumer.subscribe(Collections.singletonList(readFromKafka));
long started = System.currentTimeMillis();
while (!needStop.get()) {
assignment.set(consumer.assignment());
ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
events.addAll(CommonUtils4Tst.readEvents(records));
if (readTimeout == -1) {
if (events.size() > 0) {
break;
}
} else if (System.currentTimeMillis() - started > readTimeout) {
break;
}
}
needStop.set(true);
synchronized (MainTest.class) {
MainTest.class.notifyAll();
}
consumer.close();
}
P. S.
需要停止-全局标志,停止所有正在运行的线程,如果有成功的失败
事件-对象列表,我想检查
readTimeout-我们将等待多少时间,直到读取所有数据,如果readTimeout==-1,然后停止,当我们读取任何东西
>
您可以使用消费者。assignment()
,它将返回一组分区,并验证是否分配了该主题可用的所有分区。
如果您使用的是spring-kafka项目,您可以包括spring-kafka测试依赖性,并使用下面的方法等待主题分配,但您需要有容器<代码>ContainerTestUtils。waitForAssignment(对象容器,int分区)
我有一个在Docker中运行的Cassandra,我想在数据库准备就绪时启动一个CQL脚本。我尝试检查端口以检测它何时就绪: 但是在数据库真正准备好之前就打开了端口,因此失败。如何正确检查Cassandra状态并启动脚本?提前道谢。
我目前正在做一个kafka java项目。我是新来的,我发现很难理解与Kafka生产者/消费者设计相关的几个基本概念。 > 比方说,我有一个带有单个分区的主题,我有一个生产者正在写这个主题,一个消费者正在从这个主题中消费。如果我部署同一个应用程序的多个实例,每个实例将运行自己的消费者。在这种情况下,因为所有消费者都属于同一个group pId,所以消息是否会在多个实例上运行的消费者之间平均分配?
我想在远程位置检查Kafka消费者的连接。 可以确定是否将使用者分配给分区。 在远程位置,我可以从Kafka代理获得有关该主题的详细信息。 但是消费者能否保证消费者能够收到消费者与主题分区匹配的消息?
我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费
谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但