当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ/STOMP清除指向目的地的日程消息

史宸
2023-03-14

我想删除计划传递到特定队列的消息,但我发现该过程不必要地繁琐。

在这里,我将一条空白消息发送到具有延迟的队列:

self._connection.send(body="test", destination=f"/queue/my-queue", headers={
    "AMQ_SCHEDULED_DELAY": 100_000_000,
    "foo": "bar"
})

在这里,我想清除该队列的计划消息:

self._connection.send(destination=f"ActiveMQ.Scheduler.Management", headers={
    "AMQ_SCHEDULER_ACTION": "REMOVEALL",
}, body="")

当然,这里的“目的地”需要是ActiveMQ.计划程序,而不是我的实际队列。但是我无论如何都找不到删除预定队列/我的队列的计划消息。我尝试使用选择器标头,但这似乎不适用于AMQ_SCHEDULER_ACTION类型的消息。

我看到的唯一建议是编写一个使用者来浏览所有计划的消息,检查每个消息的目标,并按其ID删除每个计划。这对我来说似乎很疯狂,因为我不仅有几条消息,还有数百万条消息,我想删除。

有没有一种方法可以向ActiveMQ发送一个命令来清除带有自定义头值的预定消息?

也许我可以为每个队列定义一个自定义计划消息位置?

编辑:

我围绕 stomp.py 连接编写了一个包装器,用于处理发往队列的清除计划。MQS踩踏面采取现有的踩踏。连接和您正在使用的队列的名称,并提供排队enqueue_many接收清除移动

当从队列接收时,如果< code>include_delayed为< code>True,它将订阅队列和使用计划的主题。假设消息与此类一起排队,并且将原始目标队列的名称作为自定义头,则不会发往接收队列的计划消息将被过滤掉。

尚未在生产中测试。可能这里做了很多优化

用法:

stomp = MQStompFacade(connection, "my-queue")

stomp.enqueue_many([
  EnqueueRequest(message="hello"),
  EnqueueRequest(message="goodbye", delay=100_000)
])

stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):

    def __init__(self, connection: Connection, queue: str):
        self._connection = connection
        self._queue = queue
        self._messages: List[Message] = []
        self._connection_id = rand_string(6)
        self._connection.set_listener(self._connection_id, self)

    def __del__(self):
        self._connection.remove_listener(self._connection_id)

    def enqueue_many(self, requests: List[EnqueueRequest]):
        txid = self._connection.begin()
        for request in requests:
            headers = request.headers or {}

            # Used in scheduled message selectors
            headers["queue"] = self._queue

            if request.delay_millis:
                headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
            if request.priority is not None:
                headers['priority'] = request.priority

            self._connection.send(body=request.message,
                                  destination=f"/queue/{self._queue}",
                                  txid=txid,
                                  headers=headers)
        self._connection.commit(txid)

    def enqueue(self, request: EnqueueRequest):
        self.enqueue_many([request])

    def purge(self, selector: Optional[str] = None):
        num_purged = 0
        for _ in self.receive(idle_timeout=5, selector=selector):
            num_purged += 1
        return num_purged

    def move(self, destination_queue: AbstractQueueFacade,
             selector: Optional[str] = None):

        buffer_size = 500
        move_buffer = []

        for message in self.receive(idle_timeout=5, selector=selector):
            move_buffer.append(EnqueueRequest(
                message=message.body
            ))

            if len(move_buffer) >= buffer_size:
                destination_queue.enqueue_many(move_buffer)
                move_buffer = []

        if move_buffer:
            destination_queue.enqueue_many(move_buffer)

    def receive(self,
                max: Optional[int] = None,
                timeout: Optional[int] = None,
                idle_timeout: Optional[int] = None,
                selector: Optional[str] = None,
                peek: Optional[bool] = False,
                include_delayed: Optional[bool] = False):
        """
        Receiving messages until one of following conditions are met

        Args:
            max: Receive messages until the [max] number of messages are received
            timeout: Receive message until this timeout is reached
            idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
            selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
            peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
            include_delayed: Set to TRUE to return messages scheduled for delivery in the future
        """
        self._connection.subscribe(f"/queue/{self._queue}",
                                   id=self._connection_id,
                                   ack="client",
                                   selector=selector
                                   )
        if include_delayed:
            browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
            schedule_selector = f"queue = '{self._queue}'"
            if selector:
                schedule_selector = f"{schedule_selector} AND ({selector})"

            self._connection.subscribe(browse_topic,
                                       id=self._connection_id,
                                       ack="auto",
                                       selector=schedule_selector
                                       )

            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",
                headers={
                    "AMQ_SCHEDULER_ACTION": "BROWSE",
                    "JMSReplyTo": browse_topic
                },
                id=self._connection_id,
                body=""
            )

        listen_start = time.time()
        last_receive = time.time()
        messages_received = 0
        scanning = True
        empty_receive = False
        while scanning:
            try:
                message = self._messages.pop()
                last_receive = time.time()
                if not peek:
                    self._ack(message)
                messages_received += 1
                yield message
            except IndexError:
                empty_receive = True
                time.sleep(0.1)

            if max and messages_received >= max:
                scanning = False
            elif timeout and time.time() > listen_start + timeout:
                scanning = False
            elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
                scanning = False
            else:
                scanning = True

        self._connection.unsubscribe(id=self._connection_id)

    def on_message(self, frame):
        destination = frame.headers.get("original-destination", frame.headers.get("destination"))
        schedule_id = frame.headers.get("scheduledJobId")

        message = Message(
            attributes=MessageAttributes(
                id=frame.headers["message-id"],
                schedule_id=schedule_id,
                timestamp=frame.headers["timestamp"],
                queue=destination.replace("/queue/", "")
            ),
            body=frame.body
        )
        self._messages.append(message)

    def _ack(self, message: Message):
        """
        Deletes the message from queue.
        If the message has an scheduled_id, will also remove the associated scheduled job
        """
        if message.attributes.schedule_id:
            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",
                headers={
                    "AMQ_SCHEDULER_ACTION": "REMOVE",
                    "scheduledJobId": message.attributes.schedule_id
                },
                id=self._connection_id,
                body=""
            )
        self._connection.ack(message.attributes.id, subscription=self._connection_id)

共有1个答案

丰超
2023-03-14

为了删除特定的消息,您需要知道通过浏览计划的消息可以获得的ID。唯一可用的其他选项是使用删除操作中的开始和停止时间选项来删除范围内的所有消息。

MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
producer.send(request);

如果那不适合你的需要,我相信这个项目会欢迎你的贡献。

 类似资料:
  • 我们设置了几个ActiveMQ Artemis 2.17.0集群,以便在数据中心之间使用镜像进行复制。 在ActiveMQ Artemis 2.17.0或更高版本中有什么方法可以实现这一点吗?

  • 目前,当我的STOMP使用者连接到队列时,被缓冲的消息最终不会被处理。我所说的“缓冲起来”是指生产者在没有消费者连接的情况下向队列写了消息。继续这个场景,当我的消费者连接时,他们能够看到消息,但只能看到新的消息。任何先前的消息最终都不会被发送给消费者。 代理配置

  • 问题内容: 将一些地图定义为: 我想要一个指向地图地址的变量(不要复制所有变量)。我尝试使用: 但在使用时,它显示 内部编译器错误:无类型的var,初始化:new 如何获得? 编辑 该错误由另一个问题显示。 问题答案: 映射是引用类型,因此它们总是通过引用传递。您不需要指针。转到文档

  • 我们正在将SpringWebSockets集成到我们的应用程序中,我运行了HelloWorld示例,令人惊讶的是,spring为我们连接了一切,以便将服务器端通知推送到客户端。 不过,我有一些简单的问题 1) 队列是如何创建的?我使用的是ActiveMQ,队列名称与我在目的地中指定的不同(例如,像greetings-user3n9\u jn3i)。 2)目标名称是否不同于队列? 3) 我正在使用A

  • 有人能给我指出一个不错Java例子吗?在这个例子中,践踏客户端被用来连接到ActiveMQ。我还对以下内容感兴趣: 是否支持故障转移? 如何创建持久订阅? Stachp支持异步消息传递吗?示例?我想我必须为它实现MessageListener接口,但我找不到它的示例。

  • 这是我的蓝图代码。 内部消息工作。 外部消息不工作。 我在docker中运行Red Hat AMQ 7和Fuse 7。内部消息队列在服务之间正常工作。 使用AMQ 6,当在activemq中设置时,我能够向61613上的嵌入式代理发送和接收STOMP消息。xml 现在,我正在使用AMQ 7.0的默认设置,它将接受程序设置为0.0.0.0:61616和61613,这两个接受STOMP协议。 但是相同