您有两个选项(示例代码用Java编写):
1)使用“计时器”(或更高级的quartz)Camel组件。然后,您需要一个非常简单的计时器执行器,它在每个句点调用一个新的HTTProducer执行器:
public class TimerConsumer extends UntypedConsumerActor{
//Generates an event every 60 seconds:
@Override
public String getEndpointUri() {
return "timer://foo?fixedRate=true&period=15000";
}
@Override
public void onReceive(Object m) throws Exception {
if (m instanceof CamelMessage){
System.out.println("New Event (every 15sec)");
Akka.system().actorOf(Props.create(HTTProducer.class)).tell("http://google.com", getSelf());
}
}
}
2)使用Akka调度程序
//Somewhere in the beginning of your application (Global.java for Play Framework 2)
ActorRef httpActor = Akka.system().actorOf(Props.create(HTTProducer.class));
//A message every 15s to the httpActor
Akka.system().scheduler().schedule(Duration.Zero(),
Duration.create(15, TimeUnit.SECONDS), httpActor, "http://google.com",
Akka.system().dispatcher(), null);
public class HTTProducer extends UntypedProducerActor {
@Override
public String getEndpointUri() {
return "http://empty.com";
}
@Override
public Object onTransformOutgoingMessage(Object m) {
if (m instanceof String){
Map<String,Object> headers=new HashMap<>();
headers.put(Exchange.HTTP_URI, (String)m);
headers.put(Exchange.HTTP_METHOD, "GET");
return super.onTransformOutgoingMessage(new CamelMessage(null,headers));
}
return super.onTransformOutgoingMessage(m);
}
@Override
public void onRouteResponse(Object m) {
if (m instanceof CamelMessage){
CamelMessage message=(CamelMessage) m;
System.out.println("Response: " + message.getBodyAs(String.class, getCamelContext()));
System.out.println("Code: " + message.headers().get(Exchange.HTTP_RESPONSE_CODE).get());
}
}
这是我认为我应该用于这种方法的布局:并且为了适应404路由,还可以使用。现在,如果我的Akka流知识对我有用的话,我需要使用来处理这样的事情,然而,这就是我被困住的地方。 在中,我可以为不同的endpoint进行简单的映射和flatMap,但在流中,这意味着将流划分为多个流,我不太确定该如何进行。我想过使用UnzipWith和Options或通用广播。 如能在这方面提供任何帮助,将不胜感激。 如果
我们尝试在host-connection-pool下调整池、actor实例和所有其他参数的大小,但没有更好的效果。 欢迎任何建议!
更新:似乎更简单的测试用例不起作用:只是尝试通过进程内代理将消息从 ActiveMQ 生产者发送到 ActiveMQ 消费者。这是代码: 我正在尝试使用akka-camel实现一个非常简单的请求-回复模式。这是我的(测试台)代码,它试图直接使用activeMQ发送消息并期望响应: 我为消费者参与者尝试了两种不同的方法。第一个更简单,它尝试使用响应: 第二次尝试使用Camel模板回复: 我确实看到了
我确实尝试实现了这两种解决方案,但在实现的每个阶段都有许多设计选择,因此即使在一条“正确”的道路上,似乎也很容易搞砸。 1虽然我相信它是可以忽略不计的,而且是akka-http服务器运行的相同方式。
我有一些用akka写的演员,我想通过ServiceMix让他们互动。很难,我对这些技术是如何交互的有点困惑。这是我目前所理解的: < li>akka让我写一些演员: < ul > < li >生产者发送消息 < li >消费者接收消息 < li >发送和接收的非类型化编辑器 每个参与者将在一个固定的endpoint上可用,在ServiceMix中定义为route 现在我的问题是: 谁自动在jett