当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ Artemis可以向STOMP客户端播放未传递的消息吗?

夏祯
2023-03-14

目前,当我的STOMP使用者连接到队列时,被缓冲的消息最终不会被处理。我所说的“缓冲起来”是指生产者在没有消费者连接的情况下向队列写了消息。继续这个场景,当我的消费者连接时,他们能够看到消息,但只能看到新的消息。任何先前的消息最终都不会被发送给消费者。

代理配置

<?xml version='1.0'?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">

        <name>0.0.0.0</name>

        <persistence-enabled>true</persistence-enabled>

        <!-- this could be ASYNCIO, MAPPED, NIO
            ASYNCIO: Linux Libaio
            MAPPED: mmap files
            NIO: Plain Java Files
        -->
        <journal-type>ASYNCIO</journal-type>

        <paging-directory>data/paging</paging-directory>
        <bindings-directory>data/bindings</bindings-directory>
        <journal-directory>data/journal</journal-directory>
        <large-messages-directory>data/large-messages</large-messages-directory>

        <journal-datasync>true</journal-datasync>
        <journal-min-files>2</journal-min-files>
        <journal-pool-files>10</journal-pool-files>
        <journal-device-block-size>4096</journal-device-block-size>
        <journal-file-size>10M</journal-file-size>

        <!--
        This value was determined through a calculation.
        Your system could perform 50 writes per millisecond
        on the current journal configuration.
        That translates as a sync write every 20000 nanoseconds.

        Note: If you specify 0 the system will perform writes directly to the disk.
                We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
        -->
        <journal-buffer-timeout>20000</journal-buffer-timeout>

        <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
        -->
        <journal-max-io>4096</journal-max-io>
        <!--
        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
            <network-check-NIC>theNicName</network-check-NIC>
        -->

        <!--
        Use this to use an HTTP server to validate the network
            <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

        <!-- <network-check-period>10000</network-check-period> -->
        <!-- <network-check-timeout>1000</network-check-timeout> -->

        <!-- this is a comma separated list, no spaces, just DNS or IPs
            it should accept IPV6

            Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
        <!-- <network-check-list>10.0.0.1</network-check-list> -->

        <!-- use this to customize the ping used for ipv4 addresses -->
        <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

        <!-- use this to customize the ping used for ipv6 addresses -->
        <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->

        <!-- how often we are looking for how many bytes are being used on the disk in ms -->
        <disk-scan-period>5000</disk-scan-period>

        <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
            that won't support flow control. -->
        <max-disk-usage>90</max-disk-usage>
        <!-- should the broker detect dead locks and other issues -->
        <critical-analyzer>true</critical-analyzer>
        <critical-analyzer-timeout>120000</critical-analyzer-timeout>
        <critical-analyzer-check-period>60000</critical-analyzer-check-period>
        <critical-analyzer-policy>HALT</critical-analyzer-policy>
        <page-sync-timeout>1020000</page-sync-timeout>

        <acceptors>
            <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
            <acceptor name="stomp">tcp://0.0.0.0:61613?stompEnableMessageId=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
        </acceptors>

        <connectors>
            <connector name="global">tcp://172.17.0.1:61616</connector>
            <connector name="s">tcp://172.17.0.1:61617</connector>
        </connectors>

        <cluster-user>cluster</cluster-user>
        <cluster-password>REDACTED</cluster-password>

        <cluster-connections>
            <cluster-connection name="multi-region">
                <connector-ref>global</connector-ref>
                <message-load-balancing>ON_DEMAND</message-load-balancing>
                <static-connectors>
                    <connector-ref>s</connector-ref>
                </static-connectors>
            </cluster-connection>
        </cluster-connections>

        <addresses>
            <address name="/queue/global.regional">
                <multicast>
                    <queue name="/queue/global.regional">
                        <durable>true</durable>
                    </queue>
                </multicast>
            </address>
        </addresses>

        <address-settings>

            <!-- if you define auto-create on certain queues, management has to be auto-create -->
            <address-setting match="activemq.management#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <!-- with -1 only the global-max-size is in use for limiting -->
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics>
            </address-setting>

            <!--default for catch all-->
            <address-setting match="#">
                <!-- <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay> -->
                <!-- with -1 only the global-max-size is in use for limiting -->
                <!-- <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics> -->
            </address-setting>
        </address-settings>

        <security-settings>
            <security-setting match="#">
                <permission type="createNonDurableQueue" roles="amq"/>
                <permission type="deleteNonDurableQueue" roles="amq"/>
                <permission type="createDurableQueue" roles="amq"/>
                <permission type="deleteDurableQueue" roles="amq"/>
                <permission type="createAddress" roles="amq"/>
                <permission type="deleteAddress" roles="amq"/>
                <permission type="consume" roles="amq"/>
                <permission type="browse" roles="amq"/>
                <permission type="send" roles="amq"/>
                <!-- we need this otherwise ./artemis data imp wouldn't work -->
                <permission type="manage" roles="amq"/>
            </security-setting>
        </security-settings>

    </core>
</configuration>

共有1个答案

霍鸣
2023-03-14

根据描述,听起来您的客户使用的是通常所说的“pub/sub”语义。在ActiveMQ Artemis文档和配置中,这被称为“多播”语义。当使用pub/sub语义时,消费者(即订阅者)只有在连接到目的地之后(即创建订阅之后)才会收到发送的消息。

在JMS中,这些语义由客户机使用队列还是主题来控制。然而,STOMP规范只定义了一个通用的“目的地”,没有特定的语义。在“议定书概述”一节中,它指出:

STOMP服务器被建模为一组消息可以发送到的目的地。STOMP协议将目的地视为不透明字符串,它们的语法是特定于服务器实现的。另外,STOMP没有定义目的地的交付语义。目的地的传递或“消息交换”语义可能因服务器而异,甚至因目的地而异。这使得服务器能够创造性地使用它们可以通过STOMP支持的语义。

        <addresses>
            <address name="/queue/global.regional">
                <anycast>
                    <queue name="/queue/global.regional"/>
                </anycast>
            </address>
        </addresses>
 类似资料:
  • 我们正试图从ActiveMQ5.x升级到最新的Artemis(2.17),但我们的一个应用程序让我们很难过。 我们单独运行了一个事务,启用了Artemis STOMP调试,下面是我们看到的: 问题的核心似乎是来自上面显示的“send”操作的消息从未到达接收者。我们已经验证了接收者是在线的,并且它响应使用JMS和Openwire连接的其他客户端--但是这个STOMP消息没有被路由到目的地。 如果关闭

  • 我使用Spring的STOMP over WebSocket实现与一个全功能的ActiveMQ代理。当用户向主题订阅时,在成功订阅之前必须通过一些权限逻辑。我使用ChannelInterceptor应用权限逻辑,如下所示: WebSocketConfig。爪哇: WebSocketSecurityConfig.java: MySubscriptionInterceptor。爪哇: 当没有足够权限的

  • 我有一个Spring Boot服务器和一个客户端,它们使用STOMP通过Websockets连接。 我的用例是,每次向特定endpoint发出http请求时,我都想向客户机发送数据。我找到的所有教程,只展示了这种情况,客户端向“/hello”发送一些数据,服务器通过向“topic/greetings”发送数据做出反应: 我需要的是一个控制器方法,它无需消息映射即可发送数据。它应该只是在每次有人对e

  • 我已经找了一段时间来解决这个问题,但没找到什么。 我的目标是接收来自udp客户端的消息,服务器接收该消息并将其转发给web客户端,web客户端在每次接收到消息时播放一个音频片段。但是,由于某种原因,音频无法播放。如果我直接从我的目录打开页面,音频可以播放,但是如果我试图通过本地主机访问它,它无法加载。有人知道解决办法吗? 这是客户端javascript。 此页面由服务器.js、节点.js文件使用

  • 移动客户端发送->“嗨,我的账单付了吗?”通过WebSocket. websocket服务器接收此消息并通过redis pub/sub将其传递给websocket客户端服务。 因为websocket服务器和websocket客户端通过Redi pub/sub连接。它们可以交换消息。 WebSocket客户端将通过socket连接到人工代理系统,并通过“嗨,我的账单付了吗?”。 响应(“是的,是付费