当前位置: 首页 > 知识库问答 >
问题:

在附加新主题的消费者之前,我在apache kafka中创建新主题并生成消息

萧无尘
2023-03-14

在附加新主题的消费者之前,我创建新主题并在apache Kafka中生成第一条消息。然后附加新主题的消费者,但第一条消息无法消费。为什么..?

这是我的测试用例。

# create new topic 
$ kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NEW_TOPIC_NAME

# produce a first message
$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NEW_TOPIC_NAME
  > send a first message

# then execute consumer
$ kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NEW_TOPIC_NAME
  >   # no consume a first message

但在附加了新主题的消费者后,我会生成第二条消息,然后再正常消费。

共有1个答案

端木令
2023-03-14

默认情况下,kafka-console-consumer从主题的末尾开始。

如果要使用以前生成的消息,可以设置--from-beginding例如:

kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
  --topic NEW_TOPIC_NAME --from-beginning
 类似资料:
  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 我正在配置一个Apache Artemis消息代理。代理将接受大文件,下游消费者访问该主题以处理最新文件。现在我想知道如何使最新文件可用于开发运行。因为消息一天只到达几次,所以测试运行需要访问最近发送的几条消息,不能等待下一条。 对于生产和登台系统,我发现持久订阅工作良好。我已经改编了ApacheCamel配置作为示例。以下是两个接收消息的消费者,每个消费者都使用持久订阅: 这很好。如果一个消费者