当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ,WebSocket和踩踏

田巴英
2023-03-14

我无法编写一个能够通过WebSocket监听STOMP消息的服务器。我的问题在于stomp协议和JMS消费者的创建。

以下代码在createConnection上失败

class StompDemo {
  val uri = "ws://localhost:61614"
  val topicName = "mytopic"
  val broker = new BrokerService
  broker.addConnector(uri)
  val topic = new ActiveMQTopic(topicName)
  val topics = Array[ActiveMQDestination](topic)
  broker.setDestinations(topics)
  broker.start
  println("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  println("Started connection")

  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createTopic(topicName)
  val consumer = session.createConsumer(destination)
  println("Created consumer")

  while(true) {
    println("Waiting for next message")
    val message = consumer.receive
  }
}

有以下例外:

Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!

你能指出这个代码的问题吗?如何使用 AMQ 以编程方式配置 JMS 侦听器,使其能够通过 WebSocket/踩踏来配置队列或主题?

谢谢

新更新的代码因ActiveMQ传输而失败:tcp:///127.0.0.1:51309@6969 []传输连接到:tcp://127.0.0.1:51309失败:Java . io . io异常:未知数据类型:47我猜这与二进制与基于文本有关。

仍在调查失败的原因:

package org.tj.amq

import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage

//
// http://www.massapi.com/class/br/BrokerService.html
//

object AMQStompDemo extends MainLoop with Logging {
  <<("AMQ Stomp Demo")
  val uri = "tcp://localhost:6969"
  val broker = new BrokerService
  broker.setPersistent(false)
  broker.setUseJmx(false)
  broker.addConnector(uri)
  broker.start
  <<("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  connection.start
  <<("Started connection")
  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createQueue("test")
  val consumer = session.createConsumer(destination)

  while(true) {
    <<("Ready to receive next message ...")
    val message = consumer.receive
    message match {
      case tm:TextMessage => <<(s"Received text message ${tm.getText}")
      case _ => <<(s"Received another message type $message")
    }
  }

  def main(args: Array[String]): Unit = {}
}

trait Logging {
  def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}

trait MainLoop extends Logging {
  new Thread(new Runnable() {
    override def run = {
      <<("Starting main loop")
      while(true) {
        Thread.sleep(1000)
      }
    }
  }).start
}

传奇还在继续。只需添加broker.addConnector("ws://localhost:6971"),我就可以通过WS从浏览器成功连接到队列 /queue/test

现在,剩下的最后一个问题——我得到了回电,但是AMQ给了我这个

[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
    at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

就在收到第一条信息后。

[已编辑]好吧,我被 https://issues.apache.org/jira/browse/AMQ-5155 击中了,所以使用AMQ版本5.9.0工作。

我的感觉是WebSockets的AMQ太不稳定了。嗯,可能在Tomcat中使用了更保守的方法。

共有1个答案

景元徽
2023-03-14

一般来说,你不会在服务器端使用websockets,只是使用普通的STOMP或OpenWire连接。

那个siad,看看你的代码,你似乎正在使用ActiveMQ JMS客户端,它既不说STOMP也不说网页,所以你注定要失败。活动主数据库 JMS 客户端使用开放线协议,并且可以通过 TCP 或 SSL 进行连接(HTTP 可以使用正确的 jar)。

 类似资料:
  • 我们正在将SpringWebSockets集成到我们的应用程序中,我运行了HelloWorld示例,令人惊讶的是,spring为我们连接了一切,以便将服务器端通知推送到客户端。 不过,我有一些简单的问题 1) 队列是如何创建的?我使用的是ActiveMQ,队列名称与我在目的地中指定的不同(例如,像greetings-user3n9\u jn3i)。 2)目标名称是否不同于队列? 3) 我正在使用A

  • 我知道这个问题在这里被问了很多次,我也听了很多对话,但我运气不好。ActiveMQ浏览器和ActiveMQ无法连接。 所以我在这里再次询问,并附上我在当地掌握的全部信息。 JDK 1.8 服务:jmx:rmi:///jndi/rmi://E105756:1616/jmxrmi角色:管理员密码:activemq 无法启动QBrowserV2无法连接到ActiveMQ JMX服务器。 请确保JMX服务

  • 问题内容: 我目前正在玩一个使用websocket与后端通信的有角度的应用程序。我在使angular的数据绑定正常工作时遇到了一些麻烦。 在下面的示例中,我创建了一个服务,该服务创建了websocket连接。如果websocket收到一条消息,我只需将该消息推送到包含所有收到消息的数组中即可。 在我的控制器中,我将该消息数组绑定到作用域,然后用于在我的局部视图中列出所有消息。 服务: 控制器: 部

  • 嗨,伙计们正在使用Websocket和Tomcat8得到以下错误am。

  • WebSockets是Web应用程序的下一代双向通信技术,可在单个套接字上运行,并通过HTML 5兼容浏览器中的JavaScript接口公开。 一旦与Web服务器建立Web Socket连接,就可以通过调用send()方法将数据从浏览器发送到服务器,并通过onmessage事件处理程序从服务器接收数据到浏览器。 以下是创建新WebSocket对象的API。 var Socket = new Web

  • 有人能给我指出一个不错Java例子吗?在这个例子中,践踏客户端被用来连接到ActiveMQ。我还对以下内容感兴趣: 是否支持故障转移? 如何创建持久订阅? Stachp支持异步消息传递吗?示例?我想我必须为它实现MessageListener接口,但我找不到它的示例。