当前位置: 首页 > 知识库问答 >
问题:

将消息从本地代理传递到断开连接的中央代理

白修谨
2023-03-14

我有一个特定的要求,我需要将消息发送到服务器,而服务器并不总是可用的。

为此,我使用了特定于ActiveMQ的代理网络。

目标是有一个本地应用程序A(仅限生产者),它将消息推送到另一个中央应用程序B(仅限消费者)。然而,网络并不总是可用的。因此,应用程序A的代理必须存储消息并等待连接,然后才能将消息发送到应用程序B。所以基本上,A是一个代理,需要在消息可用时将消息转发给B

Broker 的 B 配置包括一个持久主题,该主题正在侦听以使用消息。

正如ActiveMQ的文档中所说,我必须使用静态网桥来实现这一点,这就是我所做的。

注意:我不能让B订阅A,因为A会有多个实例,我不能在B中配置所有实例

这里是我的本地应用程序配置(原始spring):

<!--As said in http://activemq.apache.org/spring-support.html use 
    a pooled conntection along with JMSTemplate -->
 <amq:connectionFactory id="jmsFactory" brokerURL="${jms.broker.local.url}" />
<!--SpringJMSTemplate -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory" />
</bean>
<!-- local broker with embedded -->
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
    init-method="start" destroy-method="stop">
    <property name="brokerName" value="localBroker" />
    <property name="transportConnectorURIs">
        <list>
            <value>${jms.broker.local.url}</value>
        </list>
    </property>
    <property name="networkConnectors">
        <list>
            <ref bean="networkConnector" />
        </list>
    </property>
</bean>

<amq:connectionFactory id="remoteJmsFactory"
    brokerURL="${jms.broker.remote.url}" clientIDPrefix="BRIDGED-TEST" />

<bean id="networkConnector" class="org.apache.activemq.network.DiscoveryNetworkConnector">
    <property name="uri" value="static:(${jms.broker.remote.url})"></property>
    <property name="staticallyIncludedDestinations">
        <list>
            <bean class="org.apache.activemq.command.ActiveMQTopic">
                <constructor-arg type="java.lang.String" value="${jms.topic.sample}"/>
            </bean>
        </list>
    </property>
    <property name="staticBridge" value="true"></property><!-- will deliver content even if no consumer, usefull for durable topic only -->
</bean>

localBroker是一个连接到远程代理的嵌入式代理(您可以从apacheMQ页面下载的应用程序)。

这是中心配置

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>


<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
      lazy-init="false" scope="singleton"
      init-method="start" destroy-method="stop">
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useVirtualDestSubs="true">

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="http" uri="http://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=10485760"/>
    </transportConnectors>
    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>

</broker>
<import resource="jetty.xml"/>

那么当我尝试发送/接收消息时会发生什么:

    < li >如果生产者(A)和消费者(B)分别连接到各自的代理,并且代理连接在一起,则工作正常。 < li >如果消费者(B)连接到其代理,并且有消息挂起,而生产者A的代理断开连接,则工作正常。 < li >如果生产者(A)与网络断开连接,当B再次可用时,A的代理不会将消息传递给B的代理。

在网络连接器之前,我尝试了jmsbridge geConnector,在本地代理配置中使用out⃣TopicBridge,但没有任何运气。

问题是:我如何让本地的经纪人A在重新连接时向中央的经纪人B发送消息。虽然它不可用,但要确保他不会丢失任何消息。

注意:

  • 我工作的网络不可用(可以持续数天!),我只能依靠http端口,这就是为什么它是唯一打开的端口。这意味着不可能发现多播。
  • 邮件只能传递一次。
  • 我使用本地经纪人的原因是不要管理我必须自己发送的内容。目前,它们仅用于存储和转发到中心。

编辑:我已经能够使用JMS桥使其工作,但是我有最后一个问题,如果在应用程序启动或应用程序生命周期期间连接os丢失,我需要重新启动我的代理才能发送消息。

共有2个答案

师谦
2023-03-14

我已经尝试了所有的方法,但仅使用配置无法使其正常工作,因此我最终自己这样做:

<jms:listener-container container-type="default" factory-id="proxyFactory"
    acknowledge="transacted" destination-type="topic" connection-factory="jmsFactory">
<bean id="remoteJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="remoteJmsFactory" />
    <property name="pubSubDomain" value="true"/>
</bean>
<bean id="simpleMessageProxyListener"
    class="com.xxx.jms.test.SimpleMessageProxyListener">
    <property name="jmsTemplate" ref="remoteJmsTemplate" />
    <property name="queueName" value="${jms.topic.sample}" />
</bean>

基本上我只有一个类,它通过持久订阅订阅本地代理,并向远程发送消息,如果失败,会话将被回滚。

这个简单代理依赖于Spring Listener的容器,所以即使他监听一个远程代理,它也可以工作,在我的例子中,它监听一个本地嵌入式代理,所以我不会有任何问题。

如果其他人在本地应用程序运行时停止/启动远程代理时有唯一有效的配置答案,并且不需要重新启动发送消息,请随意发布,我将投票并检查。

注意:您必须将jms.redeliveryPolicy.maximumDeliveries设置为-1才能使其工作。

岳泉
2023-03-14

我一直在使用这种“存储和转发”模式,并成功地使用了桥接。

我无法对网络连接器发表评论,但对于网桥,您必须:

  • 由于AMQ-5859错误而使用最新版本的jmeter
  • 在网桥上添加一个组织
  • 确保在远程代理连接工厂上设置重新连接
  • 异常
 类似资料:
  • 我们需要将消息从一个ActiveMQ代理复制到另一个代理。这里消息必须只是复制,并且消息应该存在于两个代理中。 我可以想到一个自定义应用程序,它订阅某个目标并读取该消息并将消息重新发布到多个代理中的目标。 我没有权限在经纪人中进行更改,所以我想不出经纪人网络选项。 是否有任何最佳实践或工具可用于将A-MQ消息从一个代理复制到另一个代理?

  • 我正在尝试设置Kafka Connect,目的是运行Elasticsearch chSinkConntor。 Kafka安装程序,由3个使用Kerberos、SSL和ACL保护的代理程序组成。 到目前为止,我一直在尝试使用docker/docker-com的连接器运行连接框架和elasticserch-server本地化(使用Kafka 2.4连接到远程kafka安装(Kafka 2.0.1-实际

  • 我目前有两个MQTT代理,它们都工作得很好。其中一个是我本地网络上的覆盆子派。另一个是托管在CloudMQTT上的远程代理。 目标是使两个代理彼此同步。当然,简单的解决方案是桥,但我遇到了一些困难,得到一个适当的桥设置。 据我所知,只有当你有一个静态IP和端口转发到Raspberry Pi时,你才能连接到你的本地网络,那么我如何才能绕过这个问题呢?两个代理需要保持一个永久的TCP连接打开,但是远程

  • 启动使用者接收消息 根据我的理解,consumer直接使用来自broker的消息,但在上面的consumer命令中,我们没有提到broker,而只提到zookeeper。消费者是否会连接到zookeeper(而不是broker)来消费消息?

  • 我正在为Kafka工作客户:librdkafka。图书馆在这里https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp.我的程序正在向代理写入2000000条消息。在此过程中,我重新启动了代理。有时,没有消息无法传递到代理。有时,大约100000条消息未能传递到代理。队列缓冲。最大消息数=1000