尝试在主题使用者上实现持久功能。为 jms 使用者和客户端 ID 放置了一个名称。(显然添加了持久=“真”)
据我所知。当它第一次运行时,该主题会将消费者注册为“耐用”。
所以基本上我这样做了,部署了生产者和消费者。它被注册为耐用消费者。将消息发布到主题,消费者将获得消息。现在,我取消部署消费者,并发布另一条消息,消费者无论何时起床都应该收到。当我再次部署消费者时,我得到了通用的temp-topic://XXXXXXXXXXXX目的地不存在。
为什么会这样?我不应该收到“丢失”的信息吗?
这是我当前为发布者配置的 jms activemq 连接器配置:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="CE-3.3.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd ">
<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="false" doc:name="Active MQ" maxRedelivery="1" persistentDelivery="true" durable="true" clientId="RoutingTopic">
<reconnect count="5" />
</jms:activemq-connector>
<message-properties-transformer name="MessagePropertiesTransformer" doc:name="Message Properties" overwrite="true">
<add-message-property key="BACKEND_SUBSCRIBER" value="#[flowVars['backend']]"/>
<add-message-property key="MULE_EVENT_TIMEOUT" value="60000"/>
</message-properties-transformer>
<flow name="jmsFlow1" doc:name="jmsFlow1">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="jms" doc:name="HTTP"/>
<set-variable variableName="#['id']" value="#[message.inboundProperties['id']]" doc:name="set dynamic id"/>
<set-variable variableName="#['backend']" value="#[message.inboundProperties['backend']]" doc:name="setting backend"/>
<set-payload value="#['This is a message test for id '] #[flowVars['id']]" doc:name="set random string as payload"/>
<choice doc:name="Choice">
<when expression="#[true]">
<processor-chain>
<jms:outbound-endpoint exchange-pattern="request-response" connector-ref="Active_MQ" doc:name="JMS Topic Requestor" transformer-refs="MessagePropertiesTransformer" topic="ESB.Topic">
</jms:outbound-endpoint>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger message="This is the default case" level="INFO" doc:name="Logger"/>
</processor-chain>
</otherwise>
</choice>
</flow>
</mule>
这是消费者之一,我得到了2个,但两者基本上是一回事
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="CE-3.3.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd ">
<jms:activemq-connector name="UpCity_Connector" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="false" maxRedelivery="0" doc:name="Active MQ" clientId="RandomName" durable="true"/>
<flow name="jmsAdapterConsumerFlow1" doc:name="fmsAdapterConsumerFlow1">
<jms:inbound-endpoint exchange-pattern="request-response" connector-ref="UpCity_Connector" doc:name="JMS Replier Consumer" topic="ESB.Topic">
<jms:selector expression="BACKEND_SUBSCRIBER='randombackend'"/>
</jms:inbound-endpoint>
<set-payload value="#[payload + ' returned from a random backend']" doc:name="Add string to payload"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>
谢谢。
JMS中的持久消息传递有点棘手,因为您的配置/代码必须满足几个标准才能正确工作:
我不确定mule是否创建了持久订户。但是,您可以在ActiveMQ的web控制台中检查。在那里,您可以获得当前持久订阅的列表。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我试图更好地理解如何在酒吧子模型中与多个消费者进行话题交流。假设我有 一个名为Log的持久队列 发布者主题交换,它将所有日志消息(日志。#)路由到此队列日志 我可以让多个消费者根据路由密钥从上述发布者队列“log”读取日志消息。e、 g.消费者C1-仅登录日志。x条信息,C2获取日志。有意思的消息。。等等 简言之,是否可以让多个消费者从同一队列中读取信息,但只获取经过过滤的消息,或者每个消费者必须
我正在使用Kafka Spring Integration发布和使用Kafka消息。我看到有效负载从生产者正确地传递到消费者,但是头信息在某个地方被覆盖了。 我得到以下标题: 我在生产者端构建消息,如下所示: 下面是生产者的标题信息: 知道为什么头会被不同的值覆盖吗?
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。