我写了一个Kafka消费者从主题中获取所有记录,然后只进入下一步,但它没有获取所有记录。
while (fetchedRecord) {
ConsumerRecords<String, String> records = consumer.poll(10);
System.out.println("Waiting for Records from Party-Resolved-Update");
Thread.sleep(40000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record fetched");
end++;
System.out.println(record.value());
if (!StringUtils.isEmpty(record.value())) {
fetchedRecord = false;
response = record.value();
FSecurity sc = new FSecurity();
sc.init();
decryptResponse.add(sc.decryptData(response));
System.out.println("decryptResponse=" + decryptResponse);
}
}
}
请正确描述你的问题。
假设您无法使用下一组记录。
解决方案要么提供enable。汽车将
提交为true
,或添加消费者。commitSync()
语句在代码中显式执行。这将允许您轮询下一个偏移记录。
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用
我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了
我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。
我的问题是关于Kafka在爪哇的消费者 > 已启动Kafka服务器 创建的主题 创作者 创建的消费者 我在终端中做的所有这些事情,工作正常,能够在消费者处正确接收日志。运行下面的消费者(在Java),但没有收到任何记录。它继续汇集在 也没有收到任何记录。 下面给出的我的 java 消费者代码 请告诉我在java消费者类中接收消息,我在配置中做错了什么吗?属性中的“group.id”是怎么回事? 下