当前位置: 首页 > 知识库问答 >
问题:

Kafka高级消费者

马梓
2023-03-14

我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。

或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。

我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。

所以3个问题,

>

  • 我怎么知道没有更多消息要从该主题中读取?

    如果我对上述问题有答案,我该如何阻止它再听这个话题?

    有没有办法找到批量读取开始时的最大偏移量(我认为简单的消费者可以做到这一点),让高级消费者在那个点停下来?

  • 共有1个答案

    徐瑞
    2023-03-14

    您可以选择在指定时间内没有新消息到达时,将所有消息视为已读。这可以使用消费者属性consumer.timeout.ms进行设置。在该指定值通过而没有任何新消息到达后,消费者迭代器将抛出超时异常,您可以在消费者中处理该异常并退出。

     类似资料:
    • 我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请

    • 我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。 这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。 在新的API(0.10.1.1)中,有一些方法 但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。 有什么想法/例子吗?

    • 嗨,我正在将kafka升级到.9,并将kafka consumer升级到与.9一起发布的新java consumer。在升级时,我使用的是现有的主题,步骤只是停止.8 kafka并开始指向相同log.dirs的.9 kafka。在消费者端,我使用的是相同的组名和主题名,但是新的消费者从主题中的起始位置再次使用消息。我已经把他们交了。我正在添加auto.offset.reset=最早。 任何想法为什

    • 我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。

    • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

    • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认