当前位置: 首页 > 知识库问答 >
问题:

Mule JMS主题和ActiveMQ配置

贺博厚
2023-03-14

我使用Mule ESB来设计一个过程,通过这个过程可以向一个主题发布消息。订阅者将收听主题并接收消息。每个用户对消息的反应不同。这里的目标是能够从HTTP向主题发送测试消息,以测试订阅者。

下面是如何配置JMS连接:

 <!-- JMS Topic connector -->
<jms:activemq-connector name="jmsTopicConnection" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ2" durable="true" numberOfConcurrentTransactedReceivers="2"/>

这是流程:

<flow name="auditJMSServiceFlow">
<http:listener config-ref="HTTP" path="/Audit/Activity" responseStreamingMode="ALWAYS" doc:name="HTTP"/>
<set-variable variableName="#['id']" value="#[message.inboundProperties['id']]" doc:name="set dynamic id"/>
<set-payload value="===TOPIC===" doc:name="Set Payload" />

 <request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
    <jms:transaction action="ALWAYS_BEGIN" />
    <!-- Not required to explicitly have this element.  Mule will put this in implicitly. -->
    <!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>

<json:object-to-json-transformer doc:name="transform JMS message to JSON"/>
<json:validate-schema schemaLocation="resource://AuditMsgSchema.json" doc:name="Validate Json Schema"/>
<component class="com.baml.panther.audit.service.impl.AuditServiceImpl" doc:name="Java"/>


<default-exception-strategy>
    <commit-transaction exception-pattern="com.foo.ExpectedExceptionType"/> 
    <jms:outbound-endpoint queue="dead.letter" connector-ref="jmsConnection"> 
        <jms:transaction action="JOIN_IF_POSSIBLE" />
    </jms:outbound-endpoint>
</default-exception-strategy>

<logger message="=== #[message.payload] received #[org.mule.util.DateUtiles.getTimeStamp('dd-MM-yyyy_HH-mm-ss.SSS')]" level="INFO" doc:name="Logger"/>

当我运行测试时,我得到以下错误:

任何建议将不胜感激。

罗斯

共有2个答案

盖玉石
2023-03-14

问题是您不能将消息源作为请求-回复中的第一个消息处理器。请求回复允许您对JMS之类的异步协议进行同步调用。如果您想在放置请求-回复的地方向消息代理发送消息,只需放置一个JMS出站endpoint。如果您想要使用来自JMS主题的消息,您必须将JMS入站endpoint作为流中的第一个消息处理器。

白越
2023-03-14

对于错误:您的estest-回复范围缺少出站endpoint。您只有入站endpoint(jms: ininding-endpoint)。您还需要提供出站endpoint。

<request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
    <jms:transaction action="ALWAYS_BEGIN" />
    <!-- Not required to explicitly have this element.  Mule will put this in implicitly. -->
    <!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>

不确定你的目标是什么,但如果你只放一个jms:出站入口(而不是整个请求-回复块),你可以向JMS主题发送一条消息

 类似资料:
  • 我试图在ActiveMQ Artemis 2.13上配置HA。我试着从一个简单的主和备份开始。我已经多次阅读关于集群和HA的文档,但我仍然不确定我在做什么。我还研究了replicated-failback java示例。 从客户端,我必须为主节点和备份节点指定连接信息吗?这个示例让我感到困惑,因为它看起来URL/连接是通过输入参数传递给java程序的,而我不确定它们来自哪里。 在主控制台中,一切看

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的

  • 此配置适用于所有 基本主题。 常用配置 日间模式 - 夜间模式 classic 主题默认提供了对日间模式和夜间模式的支持,并在导航条上提供了切换开关。 通过以下配置可以自定义各模式:docusaurus.config.js module.exports = { // ... themeConfig: { // ... colorMode: { // "

  • 我试图在Websphere Application Server中添加ActiveMQ作为JMS提供程序。 我已经按照这里描述的说明ActiveMQ5.11和WebSphere Application Server8.5,并根据主题进行了调整。 不幸的是,我不确定需要为主题连接工厂和主题定义在外部JNDI名称中添加什么。 根据IBM文档: “外部JNDI名称用于将队列绑定到应用程序服务器名称空间的

  • 我们在堆栈中使用Kafka、Kafka连接和模式注册表。版本为2.8.1(融合6.2.1)。我们使用Kafka connect的配置(key.converter和value.converter),其值为:io.confluent.connect.avro.AvroConverter。 它自动注册主题的新模式。但有一个问题,AvroConverter没有为新模式指定主题级兼容性,当我们试图通过RES