我有一个简单的Kafka 2.4.1(Confluent 5.4.1)安装程序在本地Docker中运行。并且我使用了用Java编写的测试生产者和测试使用者。该代码可在GitHub中获得。
单元测试做:
问题是:使用者的第一次运行将跳过主题中已经生成的可用消息。真正的问题是,那些错过的消息会丢失(从使用者的角度来看:偏移量被移动到主题中的最新位置,滞后为0<-这在Kafka工具中都是可见的)
第一次运行后的结果是:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
第二轮测试给出:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
我试着用不同的变体修修补补,结果总是一样的:
查看GitHub存储库中的代码,您似乎没有设置使用者配置auto.offset.reset
。根据文档,此设置默认为lates
。这意味着,如果您的测试主题的代理不知道消费者组,它将只等待新的传入消息。因此,事先由生产者测试编写的消息不能由TestConsumer使用。
本文档很好地解释了Kafka中的消费者群体概念。
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决
我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...
我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较