我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例
我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容:
INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请求处理器)
INFO已关闭的套接字连接的客户端/127.0.0.1:59899,其中有sesionid 0x144a4854325004d(org.apache.zookeeper.server.NIOServerCnxn)
我正在使用默认配置。我如何让我的消费者永远倾听?
您可能需要尝试运行ConsoleConsumer。
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
我们正在tomcat服务器中部署kafka消费者。消费者是使用Spring-Kafka2.1.7构建的。每个tc容器可以有多个属于同一个使用者组的使用者(使用ConcurrentKafkaListenerContainerFactory)。作为一般模式,在我的使用案例中,使用者以事务性的方式从一个主题读取并生产到另一个主题。tc服务器由通常的启动和关闭shell脚本启动和停止。如果要优雅地关闭co
我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。 这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。 在新的API(0.10.1.1)中,有一些方法 但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。 有什么想法/例子吗?
我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka