当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者-优先级更高的主题

南宫奇思
2023-03-14

我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。

这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。

在新的API(0.10.1.1)中,有一些方法

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。

有什么想法/例子吗?

共有2个答案

蔚承天
2023-03-14

我想您可以混合位置()和提交()方法。位置()方法获取将被获取的下一条记录的偏移量,而提交()方法获取给定分区的最后提交的偏移量(如留档中所述)。在对较低优先级进行轮询之前,您可以检查较高优先级的位置()和提交()。如果位置()高于提交(),您可以暂停较低优先级的(),并在较高优先级的()上轮询(),然后恢复较低优先级。

谭景福
2023-03-14

最后,我解决了这个问题,正如dawaw建议的那样,在处理循环中,我存储了我从中读取的所有主题/分区:

  • 开始补偿

每当(endOffset-提交)

 类似资料:
  • 我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答

  • 在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.

  • 我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我正在研究Kafka是否支持任何要处理的队列或消息的优先级。 看来它不支持任何这样的事情。我在谷歌上找到了这个也支持这一点的邮件存档:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOEJIJHVHSR=d6astihpsqwvg6vk5xylam6ymdcd6uauoxf-dq@mai

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。