在附加新主题的消费者之前,我创建新主题并在apache Kafka中生成第一条消息。然后附加新主题的消费者,但第一条消息无法消费。为什么..?
这是我的测试用例。
# create new topic
$ kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NEW_TOPIC_NAME
# produce a first message
$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NEW_TOPIC_NAME
> send a first message
# then execute consumer
$ kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NEW_TOPIC_NAME
> # no consume a first message
但在附加了新主题的消费者后,我会生成第二条消息,然后再正常消费。
默认情况下,kafka-console-consumer
从主题的末尾开始。
如果要使用以前生成的消息,可以设置--from-beginding
例如:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic NEW_TOPIC_NAME --from-beginning
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
然而,当在我的环境中测试此示例时,我得到了一个异常。
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
我正在配置一个Apache Artemis消息代理。代理将接受大文件,下游消费者访问该主题以处理最新文件。现在我想知道如何使最新文件可用于开发运行。因为消息一天只到达几次,所以测试运行需要访问最近发送的几条消息,不能等待下一条。 对于生产和登台系统,我发现持久订阅工作良好。我已经改编了ApacheCamel配置作为示例。以下是两个接收消息的消费者,每个消费者都使用持久订阅: 这很好。如果一个消费者