我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。
这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。
在新的API(0.10.1.1)中,有一些方法
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。
有什么想法/例子吗?
我想您可以混合位置()和提交()方法。位置()方法获取将被获取的下一条记录的偏移量,而提交()方法获取给定分区的最后提交的偏移量(如留档中所述)。在对较低优先级进行轮询之前,您可以检查较高优先级的位置()和提交()。如果位置()高于提交(),您可以暂停较低优先级的(),并在较高优先级的()上轮询(),然后恢复较低优先级。
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.
我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我正在研究Kafka是否支持任何要处理的队列或消息的优先级。 看来它不支持任何这样的事情。我在谷歌上找到了这个也支持这一点的邮件存档:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOEJIJHVHSR=d6astihpsqwvg6vk5xylam6ymdcd6uauoxf-dq@mai
然而,当在我的环境中测试此示例时,我得到了一个异常。