我正在尝试使用Akka和Camel的竞争性事件消费者实现。我使用Akka 2.3.2和Camel 5.8.0。我正在将camel连接到ActiveMQ代理,并使用生产者从另一端生成消息。在以下代码中,EventManager是创建消费者池的主机,Event处理器是消息处理演员。
EventManager.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.japi.Creator;
import akka.routing.RoundRobinPool;
public class EventManager {
private final ActorSystem akkaSystem;
private CamelContext camelContext = null;
private ActorRef workRouter;
public EventManager(ActorSystem system) {
akkaSystem = system;
initialize();
}
public void initialize() {
Camel camel = CamelExtension.get(akkaSystem);
camelContext = camel.context();
ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
activemqComponent.setDeliveryPersistent(false);
camelContext.addComponent("activemq",activemqComponent );
int numOfWorkers = 5;
// distributing the message processing across a pool of 5 actors
ActorRef workRouter =
akkaSystem.actorOf(new RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)),
"workRouter");
}
}
EventProcessor.java
import org.apache.log4j.Logger;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
public class EventProcessor extends UntypedConsumerActor{
private static final Logger LOGGER = Logger.getLogger(EventProcessor.class);
public EventProcessor() {
}
public void onReceive(Object message) {
if(message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
String body = camelMessage.getBodyAs(String.class, getCamelContext());
LOGGER.info("Message handled by :" +this.getSelf().path().name());
LOGGER.info("Message body:" + body);
}
}
public boolean autoAck() {
return true;
}
public String getEndpointUri() {
return "activemq:queue:dest";
}
}
我看到的问题是,消息似乎被单个参与者使用,而没有在池中分发。我需要创建一个单独的骆驼路线来分发吗?我还希望将处理分布在不同的物理节点上。感谢您的意见和最佳实践。
尝试在AMQendpoint上设置并发使用者
return "activemq:queue:dest?concurrentConsumers=50";
..或在endpoint上启用异步使用者
return "activemq:queue:dest?asyncConsumer=true";
此外,您似乎弄错了版本信息。没有Camel版本5.8。我假设这是AMQ版本。
我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者
我已经用MassTransit实现了一个简单的发布者/使用者集,我想让使用者从同一个队列中读取消息。但是,当我运行它时,我看到很大一部分消息被发送到错误队列,而不是被消耗。从我看到的讨论(所以,论坛)来看,对于RabbitMQ来说,这应该非常非常简单(只需指向相同的队列),但它并不起作用。是否有应该设置的附加配置? 这是我的出版商 还有我的消费者
我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?
问题内容: 我有2个服务。他们两个都需要订阅相同的频道。 这两个服务是负载平衡的。每个服务都在多个服务器上运行。 因此,如何确定每个服务只有1个实例消耗该通道的消息。 Redis支持此功能吗? 谢谢 问题答案: Pubsub不能这样工作- 消息会发送到所有已连接的已订阅客户端。但是,您可以对其进行设置,以使该频道是列表更新的通知。这样,所有客户端都会收到消息,但是只有一个客户端可以使用LPOP从列
例如生产比较慢,而消费比较快,就会导致消费者消费到错误数据 package main import ( "fmt" "math/rand" "sync" "time" ) // 创建一把互斥锁 var lock = sync.Mutex{} // 定义缓冲区 var sce []int = make([]int, 10) // 定义生产者 fun
[04/27/2014 18:09:05.518][ReadScheduler-Akka.actor.Default-Dispatcher-3][Akka://ReadScheduler/User/Collector]从参与者[Akka://ReadScheduler/User/Executor#2127791644]到参与者[Akka://ReadScheduler/User/Collector