当前位置: 首页 > 知识库问答 >
问题:

使用akka-camel和ActiveMQ请求回复

宓英哲
2023-03-14

更新:似乎更简单的测试用例不起作用:只是尝试通过进程内代理将消息从 ActiveMQ 生产者发送到 ActiveMQ 消费者。这是代码:

val brokerURL = "vm://localhost?broker.persistent=false"
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("foo.bar")
val producer = session.createProducer(queue)
val consumer = session.createConsumer(queue)
val message = session.createTextMessage("marco")

producer.send(message)
val resp = consumer.receive(2000)
assert(resp != null)

我正在尝试使用akka-camel实现一个非常简单的请求-回复模式。这是我的(测试台)代码,它试图直接使用activeMQ发送消息并期望响应:

val brokerURL = "vm://localhost?broker.persistent=false"

// create in-process broker, session, queue, etc...
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("myapp.somequeue")
val producer = session.createProducer(queue)
val tempDest = session.createTemporaryQueue()
val respConsumer = session.createConsumer(tempDest)
val message = session.createTextMessage("marco")
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID("myCorrelationID")

// create actor system with CamelExtension
val camel = CamelExtension(system)
val camelContext = camel.context
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerURL))
val listener = system.actorOf(Props[Frontend])

// send a message, expect a response
producer.send(message)
val resp: TextMessage = respConsumer.receive(5000).asInstanceOf[TextMessage]
assert(resp.getText() == "polo")

我为消费者参与者尝试了两种不同的方法。第一个更简单,它尝试使用sender!响应:

class Frontend extends Actor with Consumer {
  def endpointUri = "activemq:myapp.somequeue"
  override def autoAck = false
  def receive = {
    case msg: CamelMessage => {
      println("received %s" format msg.bodyAs[String])
      sender ! "polo"
    }
  }
}

第二次尝试使用Camel模板回复:

class Frontend extends Actor with Consumer {
  def endpointUri = "activemq:myapp.somequeue"
  override def autoAck = false
  def receive = {
    case msg: CamelMessage => {
      println("received %s" format msg.bodyAs[String])
      val replyTo = msg.getHeaderAs("JMSReplyTo", classOf[ActiveMQTempQueue], camelContext)
      val correlationId = msg.getHeaderAs("JMSCorrelationID", classOf[String], camelContext)
      camel.template.sendBodyAndHeader("activemq:"+replyTo.getQueueName(), "polo", "JMSCorrelationID", correlationId)
    }
  }
}

我确实看到了我的actor的接收方法的println()输出,所以ActiveMQ消息正在进入actor,但是我在测试平台中的respConsumer.receive()调用上超时。我已经尝试了很多在回复中指定和不指定标题的组合。我还尝试启用和禁用自动确认

提前谢谢。

共有1个答案

华誉
2023-03-14

事实证明,我需要在JMS代码中调用connection.start()。

 类似资料:
  • 我试图让Wildfly和ActiveMQ与Apache Camel一起工作,让我解释一下这个场景。每小时都会有一个camel批处理轮询一个FTP服务器,获取文件并将其发送到ActiveMQ代理。代理实现了两个路由:和。消息被排入,如果它们还没有准备好发送,则被路由到。中的消息由camel处理器出列和翻译,并在准备发送时排队到。中的准备发送消息被发送到Wildfly,MDB在那里执行某些操作。除了最

  • 我想使用 wsdl 文件从 Camel 调用第三方网络服务,而无需生成任何客户端代码(因为我认为如果我提供 wsdl 文件,那么 Camel 能够生成我们之前生成的客户端,并且在我们的旧代码中工作) 经过长时间的搜索,我找到了一些帮助我实现目标的代码 代码为 这工作正常,但这里我是手动生成soap信封 wsdl文件是 现在我想生成肥皂请求,而不是静态的 请帮助我 先谢谢了

  • 我正在测试将JMS请求/应答与Camel和ActiveMQ结合使用的示例。当camel为您创建监听器时,我可以让这个示例起作用。即 我现在遇到的问题是,我无法让JMS请求/回复与存在于Camel jvm之外的MessageListener一起工作。等待回复的连接超时。我确保MessageListener正在将回复发送到回复队列,我也在设置相关ID。我在这里做错了什么?我已经谷歌了几天,试图解决这个

  • 我们有一个应用程序接口实现在裸骨Scala Akka HTTP中--一对路由前面的大量计算(CPU和内存密集)。没有集群-所有运行在一个强壮的机器上。计算量相当大--一个独立请求可能需要60多秒才能完成。我们并不那么在意速度。没有阻塞IO,只是大量的CPU处理。 当我开始对它进行性能测试时,出现了一个有趣的模式:假设请求A1、A2、...、A10通过。它们会大量使用资源,结果Akka会为溢出的请求