当前位置: 首页 > 知识库问答 >
问题:

通过QPID JMS从azure servicebus接收来自主题订阅的消息

颜高朗
2023-03-14

我按照本教程阅读来自给定订阅的消息。我创建了一个像这样的接收类-

public class Receive implements MessageListener{

    private static boolean runReceiver = true;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

    public Receive() throws Exception {
        Hashtable<String, String> env = new Hashtable<String, String>();

        env.put("connectionfactory.SBCF", "amqps://All:[shared-access-key]@[namespace].servicebus.windows.net?amqp.idleTimeout=120000");
        env.put("topic.TOPIC", "job/Subscriptions/job-test-subscription");

        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

        Context context = new InitialContext(env);

        // Look up ConnectionFactory and Queue
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        Destination queue = (Destination) context.lookup("TOPIC");

        // Create Connection
        connection = cf.createConnection();

        // Create sender-side Session and MessageProducer
        sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        sender = sendSession.createProducer(queue);

        if (runReceiver) {
            // Create receiver-side Session, MessageConsumer,and MessageListener
            receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            receiver = receiveSession.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
        }
    }
    public void close() throws JMSException {
        connection.close();
    }

    public void onMessage(Message message) {
        try {
            System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
            message.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try {
            Receive simpleReceiver = new Receive();
    } catch (Exception e) {
        e.printStackTrace();
    }
}   

在执行这个程序时,我得到了这个错误-

    log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
javax.jms.JMSException: hostname can't be null
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:172)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:204)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:191)
    at Receive.<init>(Receive.java:41)
    at Receive.main(Receive.java:63)
Caused by: java.io.IOException: hostname can't be null
    at org.apache.qpid.jms.util.IOExceptionSupport.create(IOExceptionSupport.java:45)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$2.run(AmqpProvider.java:217)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: hostname can't be null
    at java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
    at java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254)
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:126)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:167)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$2.run(AmqpProvider.java:195)
    ... 7 more

错误指向此行-connection=cf.createConnection()还想知道像这样提供我的主题和订阅名称-作业/订阅/作业测试订阅是否合适。

很抱歉发了这么长的帖子。

共有1个答案

秦权
2023-03-14

从这里开始,根据小节服务总线实体地址作业/订阅/作业-测试-订阅值是您的订阅地址,而不是主题。请更改您的网址和主题属性如下,然后重试。

connectionfactory.SBCF=amqps://All:[shared-access-key]@[namespace].servicebus.windows.net/job/Subscriptions/job-test-subscription?amqp.idleTimeout=120000
topic.TOPIC=job

希望能有帮助。

 类似资料:
  • 我有一个超时选项,只想在超时前接收消息。 如果您能解释下面的代码是如何工作的,以及我如何修改下面的代码以在特定的时间框架内接收消息,并且一旦我的超时已经到达就停止接收,这将是很有帮助的。

  • 我有一个调查多个主题的消费者。对于这个问题,我限制了每个主题一个分区。假设当消费者开始轮询时,每个主题都有一些数据。阅读的顺序是什么? 是循环赛吗?它是从第一个读到下一个吗?我使用进行轮询。

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的

  • 我对Instagram显示器的任何人都不抱希望,但谁知道呢... 你们用Instagram的实时照片更新吗?我从不接收用户媒体更新。我成功订阅了它,Instagram向我的服务器发送HTTP GET确认调用,我重播了它,之后再也没有收到任何更新。 思想?

  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很

  • 我正试图找到一种方法,使用Firebase云消息发送通知给我的应用程序的所有用户,但我有一个仅限web的应用程序。我见过一些针对Android/iOS的解决方案,基本上是让用户自动订阅一个名为“allDevices”的主题,然后向订阅该主题的所有用户发送通知。我似乎找不到任何关于如何让基于web的用户订阅主题的文档。有人知道这是否可能吗?如果有,是否有我遗漏的文件可以涵盖这一点? 谢谢!