当前位置: 首页 > 知识库问答 >
问题:

Apache Camel-Kafka组件-单生产者多消费者

潘阳舒
2023-03-14

我创建了两个apache camel(blueprint XML)kafka项目,一个是kafka-producer(接受请求并将其存储在kafka服务器中),另一个是kafka-consumer(从kafka服务器获取ups消息并处理它们)。

这个设置对单个主题和单个消费者都很有效。然而,我如何在同一个Kafka主题中创建单独的消费者组?如何在不同的消费者群体中路由同一主题中的多个消费者特定消息?任何帮助都很感激。谢谢你。

共有1个答案

商飞航
2023-03-14

你的问题很笼统,因为你不太清楚你想解决的问题是什么,所以很难理解是否有更好的方法来实现解决方案。

无论如何,让我们首先说,就我所能理解的,您正在寻找一个选择性消费者(EIP),这是Kafka和消费者API不支持的开箱即用的东西。选择性使用者可以根据生产者预先放置的特定选择器的值来选择从队列或主题中选择什么消息。这个特性也必须在message broker中实现,但是kafka没有这样的功能。

Kafka确实实现了纯pub/sub和queue之间的混合解决方案。也就是说,您所能做的就是与一个或多个消费者群体(稍后会有更多)订阅该主题,并通过检查消息本身过滤掉所有您不感兴趣的消息。在消息传递和EIP领域,这种模式被称为筛选器数组。可以想象,在消息广播给所有订户之后会发生这种情况;因此,如果该解决方案不适合您的需求或上下文,那么您可以考虑实现一个基于内容的路由器,该路由器旨在将消息分派给仅在您的集中控制下的消费者子集(这意味着特定于消费者的中间通道,当然可以是其他Kafka主题或SEDA/VM队列)。

转到第二个问题,这里是官方的Kafka组件网站:https://camel.apache.org/components/latest/kafka-component.html。为了创建不同的消费者组,您只需定义多个路由,每个路由都有一个专用的GroupID。通过添加groupdId属性,您将通知消费者组协调员(驻留在Kafka代理中)多个单独的消费者组的存在,代理将使用这些消费者组来区分和单独处理他们(通过向他们发送存储在主题中的每个日志消息的副本)...

下面是一个例子:

public void configure() throws Exception {
    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=myFirstConsumerGroup"
            .log("Message received by myFirstConsumerGroup : ${body}");

    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=mySecondConsumerGroup"
            .log("Message received by mySecondConsumerGroup : ${body}");

}

正如您所看到的,我在同一个RouteBuilder中创建了两条路由,更不用说在同一个Java进程中了。在我能想到的大多数用例中,这是一个非常糟糕的设计决策,因为没有单一的责任、分离的关注点,而且它们不会扩展。但同样,这取决于您的需求/上下文。出于完整性考虑,请考虑查看所有其他Kafka组件属性,因为可能有许多其他您感兴趣的配置,如每个组的使用者线程数。我试着保持高水平,为了引发讨论...如果你有新的更新,我会编辑我的答案。希望我能帮上忙!

 类似资料:
  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa