我按照本教程阅读来自给定订阅的消息。我创建了一个像这样的接收类-
public class Receive implements MessageListener{
private static boolean runReceiver = true;
private Connection connection;
private Session sendSession;
private Session receiveSession;
private MessageProducer sender;
private MessageConsumer receiver;
private static Random randomGenerator = new Random();
public Receive() throws Exception {
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.SBCF", "amqps://All:[shared-access-key]@[namespace].servicebus.windows.net?amqp.idleTimeout=120000");
env.put("topic.TOPIC", "job/Subscriptions/job-test-subscription");
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(env);
// Look up ConnectionFactory and Queue
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Destination queue = (Destination) context.lookup("TOPIC");
// Create Connection
connection = cf.createConnection();
// Create sender-side Session and MessageProducer
sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
sender = sendSession.createProducer(queue);
if (runReceiver) {
// Create receiver-side Session, MessageConsumer,and MessageListener
receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
receiver = receiveSession.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
}
}
public void close() throws JMSException {
connection.close();
}
public void onMessage(Message message) {
try {
System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
Receive simpleReceiver = new Receive();
} catch (Exception e) {
e.printStackTrace();
}
}
在执行这个程序时,我得到了这个错误-
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
javax.jms.JMSException: hostname can't be null
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:172)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:204)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:191)
at Receive.<init>(Receive.java:41)
at Receive.main(Receive.java:63)
Caused by: java.io.IOException: hostname can't be null
at org.apache.qpid.jms.util.IOExceptionSupport.create(IOExceptionSupport.java:45)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$2.run(AmqpProvider.java:217)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: hostname can't be null
at java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
at java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254)
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:126)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:167)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$2.run(AmqpProvider.java:195)
... 7 more
错误指向此行-connection=cf.createConnection()
还想知道像这样提供我的主题和订阅名称-作业/订阅/作业测试订阅
是否合适。
很抱歉发了这么长的帖子。
从这里开始,根据小节服务总线实体地址
,作业/订阅/作业-测试-订阅
值是您的订阅地址,而不是主题。请更改您的网址和主题属性如下,然后重试。
connectionfactory.SBCF=amqps://All:[shared-access-key]@[namespace].servicebus.windows.net/job/Subscriptions/job-test-subscription?amqp.idleTimeout=120000
topic.TOPIC=job
希望能有帮助。
我有一个超时选项,只想在超时前接收消息。 如果您能解释下面的代码是如何工作的,以及我如何修改下面的代码以在特定的时间框架内接收消息,并且一旦我的超时已经到达就停止接收,这将是很有帮助的。
我有一个调查多个主题的消费者。对于这个问题,我限制了每个主题一个分区。假设当消费者开始轮询时,每个主题都有一些数据。阅读的顺序是什么? 是循环赛吗?它是从第一个读到下一个吗?我使用进行轮询。
在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的
我对Instagram显示器的任何人都不抱希望,但谁知道呢... 你们用Instagram的实时照片更新吗?我从不接收用户媒体更新。我成功订阅了它,Instagram向我的服务器发送HTTP GET确认调用,我重播了它,之后再也没有收到任何更新。 思想?
我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很
我正试图找到一种方法,使用Firebase云消息发送通知给我的应用程序的所有用户,但我有一个仅限web的应用程序。我见过一些针对Android/iOS的解决方案,基本上是让用户自动订阅一个名为“allDevices”的主题,然后向订阅该主题的所有用户发送通知。我似乎找不到任何关于如何让基于web的用户订阅主题的文档。有人知道这是否可能吗?如果有,是否有我遗漏的文件可以涵盖这一点? 谢谢!