当前位置: 首页 > 知识库问答 >
问题:

Akka演员Java竞争消费者

姜鹏程
2023-03-14

我正在尝试使用Akka和Camel的竞争性事件消费者实现。我使用Akka 2.3.2和Camel 5.8.0。我正在将camel连接到ActiveMQ代理,并使用生产者从另一端生成消息。在以下代码中,EventManager是创建消费者池的主机,Event处理器是消息处理演员。

EventManager.java

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.camel.component.ActiveMQComponent;
    import org.apache.camel.CamelContext;

    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import akka.actor.UntypedActor;

    import akka.camel.Camel;
    import akka.camel.CamelExtension;
    import akka.japi.Creator;
    import akka.routing.RoundRobinPool;

    public class EventManager {




        private final ActorSystem akkaSystem;

        private CamelContext camelContext = null;

        private ActorRef workRouter;

        public EventManager(ActorSystem system) {
            akkaSystem = system;

            initialize();
        }

        public void initialize() {

            Camel camel = CamelExtension.get(akkaSystem);

            camelContext = camel.context();

            ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
            activemqComponent.setDeliveryPersistent(false);
            camelContext.addComponent("activemq",activemqComponent );


            int numOfWorkers = 5;

// distributing the message processing across a pool of 5 actors
            ActorRef workRouter =
                      akkaSystem.actorOf(new RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)), 
                        "workRouter");

        }

    }

EventProcessor.java

import org.apache.log4j.Logger;



import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;

public class EventProcessor extends UntypedConsumerActor{

    private static final Logger LOGGER = Logger.getLogger(EventProcessor.class);
    public EventProcessor() {

    }

    public void onReceive(Object message) {
        if(message instanceof CamelMessage) {
            CamelMessage camelMessage = (CamelMessage) message;
            String body = camelMessage.getBodyAs(String.class, getCamelContext());
                LOGGER.info("Message handled by :" +this.getSelf().path().name());

                LOGGER.info("Message body:" + body);
        }


    }

    public boolean autoAck() {
        return true;
    }
    public String getEndpointUri() {
        return "activemq:queue:dest";
    }

}

我看到的问题是,消息似乎被单个参与者使用,而没有在池中分发。我需要创建一个单独的骆驼路线来分发吗?我还希望将处理分布在不同的物理节点上。感谢您的意见和最佳实践。

共有1个答案

戚弘和
2023-03-14

尝试在AMQendpoint上设置并发使用者

return "activemq:queue:dest?concurrentConsumers=50";

..或在endpoint上启用异步使用者

return "activemq:queue:dest?asyncConsumer=true";

此外,您似乎弄错了版本信息。没有Camel版本5.8。我假设这是AMQ版本。

 类似资料:
  • 我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者

  • 我已经用MassTransit实现了一个简单的发布者/使用者集,我想让使用者从同一个队列中读取消息。但是,当我运行它时,我看到很大一部分消息被发送到错误队列,而不是被消耗。从我看到的讨论(所以,论坛)来看,对于RabbitMQ来说,这应该非常非常简单(只需指向相同的队列),但它并不起作用。是否有应该设置的附加配置? 这是我的出版商 还有我的消费者

  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?

  • 问题内容: 我有2个服务。他们两个都需要订阅相同的频道。 这两个服务是负载平衡的。每个服务都在多个服务器上运行。 因此,如何确定每个服务只有1个实例消耗该通道的消息。 Redis支持此功能吗? 谢谢 问题答案: Pubsub不能这样工作- 消息会发送到所有已连接的已订阅客户端。但是,您可以对其进行设置,以使该频道是列表更新的通知。这样,所有客户端都会收到消息,但是只有一个客户端可以使用LPOP从列

  • 例如生产比较慢,而消费比较快,就会导致消费者消费到错误数据 package main import ( "fmt" "math/rand" "sync" "time" ) // 创建一把互斥锁 var lock = sync.Mutex{} // 定义缓冲区 var sce []int = make([]int, 10) // 定义生产者 fun

  • [04/27/2014 18:09:05.518][ReadScheduler-Akka.actor.Default-Dispatcher-3][Akka://ReadScheduler/User/Collector]从参与者[Akka://ReadScheduler/User/Executor#2127791644]到参与者[Akka://ReadScheduler/User/Collector