当前位置: 首页 > 知识库问答 >
问题:

Spring集成-MQTT订阅问题

林和煦
2023-03-14

我正在尝试实现Spring与MQTT的集成。我正在使用Mosquitto作为MQTT经纪人。并在下面的链接中提供了文档的引用。我已经创建了一个项目并添加了所有所需的jar文件。当我执行MQTTJavaApplication时。

public class MqttJavaApplication {

     public static void main(String[] args) {
            new SpringApplicationBuilder(MqttJavaApplication.class)
                    .web(false)
                    .run(args);
        }

        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }

        @Bean
        public MqttPahoMessageDrivenChannelAdapter inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "test",
                                                     "sample");
            adapter.setCompletionTimeout(5000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }

        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {

                @Override
                public void handleMessage(Message<?> message) {
                    System.out.println("Test##########"+message.getPayload());
                }

            };
        }
}

当我通过MQTT代理发布消息时,我收到以下错误。

[2015-11-23 10:08:19.545] boot - 8100  INFO [main] --- AnnotationConfigApplicationContext: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@2d0e1c: startup date [Mon Nov 23 10:08:19 IST 2015]; root of context hierarchy
[2015-11-23 10:08:19.831] boot - 8100  INFO [main] --- PropertiesFactoryBean: Loading properties file from URL [jar:file:/D:/IoTWorkspace/MQTTTest/WebContent/WEB-INF/lib/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
[2015-11-23 10:08:20.001] boot - 8100  INFO [main] --- DefaultLifecycleProcessor: Starting beans in phase 1073741823
[2015-11-23 10:08:20.104] boot - 8100  INFO [main] --- MqttPahoMessageDrivenChannelAdapter: started inbound
[2015-11-23 10:08:20.112] boot - 8100  INFO [main] --- MqttJavaApplication: Started MqttJavaApplication in 1.602 seconds (JVM running for 2.155)
[2015-11-23 10:13:04.564] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Unhandled exception for GenericMessage [payload=Testing Subscription, headers={timestamp=1448253784563, id=cd5be974-3b19-8317-47eb-1c139725be24, mqtt_qos=0, mqtt_topic=sample, mqtt_retained=false, mqtt_duplicate=false}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.messageArrived(MqttPahoMessageDrivenChannelAdapter.java:262)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:336)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:148)
    at java.lang.Thread.run(Thread.java:722)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 10 more
[2015-11-23 10:13:04.613] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Lost connection:MqttException; retrying...
[2015-11-23 10:13:04.614] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Failed to schedule reconnect
java.lang.NullPointerException
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.scheduleReconnect(MqttPahoMessageDrivenChannelAdapter.java:228)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectionLost(MqttPahoMessageDrivenChannelAdapter.java:255)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost(CommsCallback.java:229)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:339)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:171)
    at java.lang.Thread.run(Thread.java:722)
[2015-11-23 10:13:04.617] boot - 8100  INFO [Thread-0] --- AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2d0e1c: startup date [Mon Nov 23 10:08:19 IST 2015]; root of context hierarchy
[2015-11-23 10:13:04.619] boot - 8100  INFO [Thread-0] --- DefaultLifecycleProcessor: Stopping beans in phase 1073741823
[2015-11-23 10:13:04.622] boot - 8100 ERROR [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: Exception while unsubscribing
Client is not connected (32104)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:132)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.unsubscribe(MqttAsyncClient.java:707)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.unsubscribe(MqttAsyncClient.java:682)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStop(MqttPahoMessageDrivenChannelAdapter.java:124)
    at org.springframework.integration.endpoint.AbstractEndpoint.doStop(AbstractEndpoint.java:145)
    at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:128)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
    at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
    at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:118)
    at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:966)
    at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:893)
[2015-11-23 10:13:04.623] boot - 8100 ERROR [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: Exception while disconnecting
Client is disconnected (32101)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.disconnect(ClientComms.java:405)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:524)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:493)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:500)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStop(MqttPahoMessageDrivenChannelAdapter.java:131)
    at org.springframework.integration.endpoint.AbstractEndpoint.doStop(AbstractEndpoint.java:145)
    at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:128)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
    at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
    at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:118)
    at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:966)
    at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:893)
[2015-11-23 10:13:04.624] boot - 8100  INFO [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: stopped inbound

添加@SpringBootApplication注释后,请找到下面的堆栈跟踪

[2015-11-26 10:55:46.571] boot - 5524  INFO [main] --- AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@562d4b: startup date [Thu Nov 26 10:55:46 IST 2015]; root of context hierarchy

共有1个答案

王楚青
2023-03-14

是啊!的确如此。如果没有@springbootapplication,我就拥有相同的StackTrace。

Spring Boot的内容包括IntegrationAutoConfiguration,它负责订阅者和TaskSchedulerbean填充。

 类似资料:
  • 更新时间:2019-10-31 10:53:29 节点简介 MQTT订阅节点开启对设备的监听,设备的属性,事件,工作状态等都可以进行监听,可以用来获取设备的实时信息 使用场景 可以用来监听设备当前的工作状态,是否正常,例如设备离线了,设备异常,用户可以方便的知道设备的实时情况 配置项 Topic配置 增加需要侦听的Topic 支持订阅项目下产品/设备的所有有发布权限的Topic,输入格式示例:/s

  • 尝试从Web运行示例spring-integration mqtt项目。我已经在根上下文中导入了mqtt-context。部署war之后,我将运行runmqtt.java文件。但会遇到以下问题。如果在独立模式下运行,相同的文件不会产生任何问题。 mqtt-context.xml http://www.springframework.org/schema/integration http://www

  • 有人知道我在哪里可以得到一些示例MQTT客户端Go(golang)代码,它在无限循环中发布和订阅? null 下面是我正在使用的代码: 我翻阅了GoDocs寻找一些关于如何保持连接畅通的提示,但似乎没有什么相关的。我当然可以在subscribe上执行无限循环,但这似乎效率低下。

  • 谢谢你提前阅读。在我的主要方法中,我有一个出版频道 在执行长时间运行流程的服务中,它创建了一个收费计划,我将频道注入其中 这就是我正在努力解决的问题。我想使用CompletableFuture并在另一个Springbean中获得未来事件的有效负载。我需要一个未来来从消息返回有效负载。我想我想创建一个ServiceActivator作为消息的endpoint,但正如我所说的,我需要它来返回未来a的有

  • About MQTT是IoT(Internet of Things)应用程序中经常使用的轻量经发布/订阅协议。 Further information can be found at http://mqtt.org. The HiveMQ website has a great series on MQTT Essentials. 功能点如下: 完整的MQTT支持 (e.g. last will,

  • 问题内容: 我正在使用Paho发送和接收mqtt消息。到目前为止,发送消息一直没有问题,我正在使用mosquitto接收消息。 现在,我想使用Java客户端读取消息,并且注意到关于接收消息的文档越来越少。 我实现了MqttCallback接口,但仍然无法弄清楚如何阅读已订阅的主题的消息。 到目前为止,这是我的源代码,我可以使用mosquitto_sub读取消息。 问题答案: 您将在代理有时间将消息