当前位置: 首页 > 知识库问答 >
问题:

在Apache Kafka 2.0中有没有一种方法来区分消息的优先级?

解翰采
2023-03-14

编辑

如果其他人在这种特殊的情况下,我得到了一些类似于我在调整消费者配置后所寻找的东西。我创建了一个生产者,它将优先级消息发送到三个单独的主题(针对高/中/低优先级),然后创建了三个单独的消费者来消费每一个主题。然后我频繁地轮询优先级较高的主题,而不轮询优先级较低的主题,除非高的主题是空的:

    while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records

poll timeout(.poll()方法的参数)确定没有要轮询的记录时等待的时间。我为每个主题设置了一个非常短的时间,但您可以为较低的优先级设置更低的时间,以确保在存在高pri消息时不会消耗宝贵的等待周期

max.poll.records配置显然决定了单个轮询中要抓取的最大记录数。对于较高的优先级,这一数字也可以设置得更高。

max.poll.interval.ms配置确定轮询之间的时间-处理max.poll.records消息所需的时间。这里澄清一下。

另外,我认为暂停/恢复整个使用者/主题可以这样实现:

    kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

我不确定这是不是最好的方法,但我在别处找不到一个好的例子

我同意下面吴老师的观点,这并不是Kafka真正的正确用法。这是单线程处理,每个主题都有一个专用的使用者,但我将从这里着手改进这个过程。

处理器API

当我尝试处理器API时,我尝试通过检查poll()是否为空来确定高优先级KafkaConsumer当前是否正在处理任何内容,然后希望使用Med优先级使用者poll(),但第二个主题poll返回为空。似乎也没有一种简单的方法来获取主题上的所有TopicPartition以便调用KafkaConsumer.pause(partitions)

Kafka溪流

当我尝试kafkastreams时,我设置了一个流以从我的每个“优先级”主题使用,但是没有办法检查连接到较高优先级主题的kstreamkafkastreams实例当前是否空闲或正在处理。

我的代码基于这个文件

其他

我也尝试了这里的代码:priority-kafka-client,但它并没有像预期的那样工作,因为运行下载的测试文件有混合的优先级。

共有1个答案

吕岳
2023-03-14

这听起来像是应用程序中的一个设计问题--kafka最初被设计为提交日志,其中每条消息都以偏移量写入代理,并且各种使用者以提交它们的顺序使用它们,具有非常低的延迟和高吞吐量。考虑到分区而不是主题是Kafka中工作分配的基本单位,具有主题级别的优先级将很难在本地实现。

我建议你调整你的设计,使用Kafka以外的其他建筑构件,而不是试图削减你的脚,以适应鞋。您可以做的一件事是让您的生产者上传文件到适当的文件存储,并通过Kafka发送链接,包括元数据。然后,根据带宽状态,您的用户可以根据大文件的元数据来决定下载是否明智。这样,您可能更有可能拥有一个健壮的设计,而不是错误地使用Kafka。

如果您真的想要坚持只使用Kafka,一个解决方案是将大文件发送到一些固定数量的硬编码分区,只有当带宽良好时,用户才会使用这些分区。

 类似资料:
  • 问题内容: 我的孩子做了一项家庭作业,用Java编写二十一点。我为他提供了一些帮助,但大部分时间他都是自己完成的,实际上效果还不错。他甚至发现了我在计算手值时没有看到的错误。但是,他还没有解决一个难题,我能想到的每个解决方案都非常复杂,而且远远超出了他将能够利用其仍基本的Java技能轻松编写代码的范围。 王牌。实际上,不仅有一个Ace,还有四个Ace,您可能可以一手拿到所有四个Ace。当有一个或多

  • 我正在努力用testNG在类级别声明一个参数。我有一个参数,在方法级别声明时可以正常工作。 因为我正在将测试映射到cucumber步骤定义,并将在方法级别声明一个参数,所以我想将browser参数从方法级别转移到类(全局)级别。因此,在xml文件中,我将浏览器参数从测试级别移动到套件级别,如下所示: 然后在测试类中,我从方法和类中删除了参数,并将声明为。不幸的是,浏览器在运行时找不到,导致一个:

  • 问题内容: 我正在使用GhostDriver的Java绑定对PhantomJS运行Selenium接受测试。 如果PhantomJS请求的网页之一通过console.log登录到Javascript控制台,是否可以捕获或查看这些消息? 我想这的答案很简单,但我无法解决! 问题答案:

  • 我有一个带有RESTAPI的Java服务。从客户端一次可以发送多个请求,我希望通过API在服务器端一次实现一个请求。问题是我需要在服务器端对请求进行优先级排序。换句话说,如果一次发送10个请求,我需要使用优先级队列一次对所有10个请求设置优先级,并根据优先级选择一个请求。可以从同一个客户端或多个客户端发送请求。 现在,在上面的代码中,一次可以为一个请求执行这个。 有什么解决办法吗?我见过一些多线程

  • 问题内容: 我想生成一个以字母作为键的字典,类似 生成该字典而不是我必须键入它的快速方法是什么? 谢谢你的帮助。 编辑 谢谢大家的解决方案:) nosklo的 解决方案可能是最短的 另外,感谢您提醒我有关Python字符串模块的信息。 问题答案: 我发现此解决方案更加优雅: