当前位置: 首页 > 知识库问答 >
问题:

为什么消费者在创建主题(以编程方式)和发布消息后无法立即看到消息?

程俊力
2023-03-14

我注意到,如果我在创建主题后立即创建主题并发布消息(都在同一个生产者程序中),消费者程序不会分配分区(即consumer.assignment()返回空分区列表)。然而,如果在消费者订阅主题之前添加一些等待,那么分区分配工作正常,消费者接收消息。

为什么?

共有1个答案

呼延德华
2023-03-14

发送消息是异步的,这意味着发送方法只将记录添加到缓冲区,并立即返回。如果您在调用send后立即开始阅读。要传输的记录可能仍在缓冲空间中。这就是为什么您必须等待一段时间才能看到生成的消息。对于您的场景,KafkaProducer提供了一个刷新方法,该方法使所有缓冲记录立即可供发送(即使linger.ms大于0),并在完成与这些记录相关的请求时阻塞。请参阅下面的代码:

for(ConsumerRecord<String, String> record: consumer.poll(100))
    producer.send(new ProducerRecord("my-topic", record.key(), record.value());
producer.flush();
consumer.commit();
 类似资料:
  • 我有一个制作人发布一个名为“MyTopic”的主题的消息。我有两个不同的消费者组中的两个消费者在听这些消息。我按以下顺序开始这两个消费者和生产者。 1)启动组“GROUP1”中的使用者%1%2)启动生产者以发布数百条消息 过了一段时间,我检查消费者1的偏移量,这与我预期的一样: 输出: 输出: 对于任何其他拥有新消费群体的消费者来说,也存在同样的问题。为什么在消息发布后加入一个新的消费者组的消费者

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。