当前位置: 首页 > 知识库问答 >
问题:

Camel MQTT组件-动态endpoint数量有限?

赵雅懿
2023-03-14

我们正在使用Camel MQTT组件订阅几个主题并执行路由逻辑。

在某些情况下,我们使用动态endpoint在运行时设置发布主题名称,该名称最多可用于15个并发连接的endpoint。

一旦创建第16个动态endpoint(toD()-call),连接到mqtt代理时就会出现超时。

org.apache.camel.FailedToCreateProducerException: Failed to create Producer for endpoint: <my connection here>&connectAttemptsMax=1&reconnectAttemptsMax=1&publishTopicName=<topic>. Reason: java.util.concurrent.TimeoutException
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:579) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:406) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:119) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.component.mqtt.MQTTConsumer.processExchange(MQTTConsumer.java:51) [camel-mqtt-2.18.3.jar:2.18.3]
    at org.apache.camel.component.mqtt.MQTTEndpoint$2.onPublish(MQTTEndpoint.java:257) [camel-mqtt-2.18.3.jar:2.18.3]
    at org.fusesource.mqtt.client.CallbackConnection$8.onPublish(CallbackConnection.java:521) [mqtt-client-1.14.jar:1.14]
    at org.fusesource.mqtt.client.CallbackConnection.toReceiver(CallbackConnection.java:911) [mqtt-client-1.14.jar:1.14]
    at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:808) [mqtt-client-1.14.jar:1.14]
    at org.fusesource.mqtt.client.CallbackConnection.access$1700(CallbackConnection.java:73) [mqtt-client-1.14.jar:1.14]
    at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:425) [mqtt-client-1.14.jar:1.14]
    at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:713) [hawtdispatch-transport-1.22.jar:1.22]
    at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:592) [hawtdispatch-transport-1.22.jar:1.22]
    at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) [hawtdispatch-1.22.jar:1.22]
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) [hawtdispatch-1.22.jar:1.22]
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) [hawtdispatch-1.22.jar:1.22]
Caused by: java.util.concurrent.TimeoutException: null
    at org.fusesource.mqtt.client.Promise.await(Promise.java:83) ~[mqtt-client-1.14.jar:1.14]
    at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:342) ~[camel-mqtt-2.18.3.jar:2.18.3]
    at org.apache.camel.component.mqtt.MQTTProducer.doStart(MQTTProducer.java:38) ~[camel-mqtt-2.18.3.jar:2.18.3]
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:75) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.impl.DefaultCamelContext.deferStartService(DefaultCamelContext.java:1316) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.impl.DefaultCamelContext.doAddService(DefaultCamelContext.java:1244) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.impl.DefaultCamelContext.addService(DefaultCamelContext.java:1214) ~[camel-core-2.18.3.jar:2.18.3]
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:577) ~[camel-core-2.18.3.jar:2.18.3]
    ... 20 common frames omitted

如果Mqtt组件的并发连接endpoint是在运行时创建的,那么这些endpoint是否有限制?

静态endpoint似乎没有限制。我们能够启动超过15条路由,endpoint订阅主题并消耗消息

更新:作为备用方案,我们尝试使用camel-paho组件,它可以使用相同的代理设置和动态主题。

对这个问题有什么帮助吗?

共有1个答案

游皓
2023-03-14

虽然这可能是mqtt组件的一个限制,但您可以使用一个endpoint并在头中动态设置发布主题
MQTT组件文档建议使用CamelMQTTPublishTopic头(从Camel 2.14开始),因此您的路线可能如下

<route>
    <from uri="direct:start" />
    <to uri="bean:publishTopicProcessor" />
    <to uri="mqtt:singleEndpoint" />
</route>

在处理器内部,您可以实现所需的所有逻辑

public void process(Exchange exchange) {
    String dynamicTopic = "your/dynamic/topic";
    // set the topic you want in the string
    exchange.getIn().setHeader("CamelMQTTPublishTopic", dynamicTopic);
}

现在,单一endpoint将发布不同主题的消息。

在这个例子中,我使用Camel 2.18做了完全相同的事情

 类似资料:
  • 问题内容: 我制作了一个便签程序,可以帮助您学习JavaFX。它通过XML保存该类,并在启动时查找XML文件,并将其添加到名为AllCards的NoteCardSet类型的ArrayList,即NoteCards的ArrayList。有了这个,我制作了许多动态按钮,使它们宽了4列。这是该代码: 显然,这可以在JavaFX中创建,但是可以在FXML中创建吗? 问题答案: 不,您不能在FXML中执行此

  • 我目前正在开发一个基于Camel的测试工具应用程序,它处理来自多个文件夹的文件组,并与本地存储库中的文件进行比较。 有没有办法从骆驼路由中的终点动态更改文件夹位置?我想使用一条路由来轮询来自多个文件夹的文件。

  • 在中,我正在填充和。现在,我需要定义一个新的路由,它使用来自上传队列的消息,并复制一个本地文件夹(基于在上一个路由中生成的Id),并将其上传到目标文件夹,该文件夹是一个ftp服务器(这也在上一个路由中填充) 那么,如何设计一条新的路线,其中从和到终点都是动态的,如下所示?

  • array.h #include<stdio.h> #include<stdlib.h> struct data { int *p;//指针保存数组的起始点 int length;//保存数组的长度 int stat;//0代表无序,1代表有序从小到大,2有序从大到小 int reallength;//实际分配的内存长度 };

  • 通过使用保留的<component>元素,并对其is特性进行动态绑定,你可以在同一个挂载点动态切换多个组件: var vm = new Vue({ el: '#example', data: { currentView: 'home' }, components: { home: { /* ... */ }, posts: { /* ... */ },