当前位置: 首页 > 知识库问答 >
问题:

使用Alpakka手动确认ActiveMQ消息

穆彬郁
2023-03-14

我正在用Java实现Akka-Alpakka,用于从ActiveMQ队列消费和生成队列。我可以成功地从队列中消费,但还不能实现应用程序级消息确认。

我的目标是使用队列中的消息,并将它们发送给另一个参与者进行处理。当该角色完成处理后,我希望它能够控制ActiveMQ中的消息确认。这大概是通过向另一个可以进行确认的参与者发送消息、调用消息本身的确认函数或其他方式来完成的。

在我的测试中,两条消息被放入AlpakkaTest队列,然后这段代码尝试使用并确认它们。然而,我看不到将ActiveMQ会话设置为CLIENT_ACKNOWLEDGE的方法,也看不到调用<code>m.acknowle()与不调用的行为有任何区别 。因此,我认为消息仍然是自动确认的。

有谁知道在爪哇阿卡系统中为CLIENT_ACKNOWLEDGE配置 ActiveMQ 会话并使用阿尔帕卡手动确认 ActiveMQ 消息的公认方法吗?

相关测试功能是:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.

Source<Message, NotUsed> jmsSource = JmsSource.create(
    JmsSourceSettings.create(connectionFactory)
        .withQueue("AlpakkaTest")
        .withBufferSize(2)
);

Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.

try {
    List<Message> messages = jmsSource
        .take(2)
        .runWith(Sink.seq(), materializer)
        .toCompletableFuture().get(4, TimeUnit.SECONDS);

    for(Message m:messages) {
        System.out.println("Found Message ID: " + m.getJMSMessageID());

        try {
            m.acknowledge();
        } catch(JMSException jmsException) {
            System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
        }
    }
} catch (InterruptedException e1) {
    e1.printStackTrace();
} catch (ExecutionException e1) {
    e1.printStackTrace();
} catch (TimeoutException e1) {
    e1.printStackTrace();
} catch (JMSException e) {
    e.printStackTrace();
}

此代码打印:

Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1

共有2个答案

杜轩昂
2023-03-14

查看Alpakka JmsSourceStage的源代码,它已经为您确认了每个传入消息(它的会话是客户端确认会话)。从源代码中我可以看出,没有允许您确认消息的模式。

您可以在此处查看阿尔帕卡的源代码。

王建华
2023-03-14

更新:自 Alpakka 0.15 起,确认模式可在 JMS 连接器中配置。从链接的文档:

Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("test")
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
);

CompletionStage<List<String>> result = jmsSource
    .take(msgsIn.size())
    .map(message -> {
        String text = ((ActiveMQTextMessage)message).getText();
        message.acknowledge();
        return text;
    })
    .runWith(Sink.seq(), materializer);

从版本0.11开始,Alpakka的JMS连接器不支持应用程序级消息确认。Alpakka在内部使用<code>CLIENT_ACKNOWLEDGE<code>模式创建<code>会话

有一个开放的票证讨论了启用基于队列的源的下游确认,但该票证已停用一段时间。

目前,您不能阻止Alpakka在JMS级别确认消息。但是,这并不妨碍您向流中添加一个阶段,将每条消息发送给一个参与者进行处理,并将参与者的回复用作背压信号。Akka Streams文档描述了如何使用< code>mapAsync和< code>ask的组合或< code > sink . actorrefwithack 来完成此操作。例如,使用前者:

Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS);

jmsSource
    .mapAsync(2, msg -> ask(processorActor, msg, askTimeout))
    .runWith(Sink.seq(), materializer);

(旁注:在相关的Streamz项目中,有一个最近开放的票证,允许应用程序级确认。Streamz是旧的akka-camel模块的替代品,和Alpartka一样,是在Akka Streams上构建的。Streamz还有一个JavaAPI,作为外部连接器列在Alpartka留档中。)

 类似资料:
  • 我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多

  • 以前我读取队列中的所有消息,但现在我必须根据用户的选择返回特定数量的消息(计数)。 我试着相应地改变for循环,但是由于自动应答,它读取了所有的消息。所以我尝试在配置文件中将它改为手动。 在我的程序中,如何在读取 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收,我没有频道的参考)? 任何帮助都是非常值得赞赏的,提前表示感谢。

  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

  • 如果在ActiveMQ中创建了一个队列,该队列具有一个生产者(即客户端确认模式)和一个侦听器,在成功处理消息后,其onMessage方法中仅对其进行确认。假设存在异常,并且消息未被确认,因此仍在队列中。是否会再次发送给消费者?或者这些信息会发生什么?

  • 我创建了一个具有事务 和 的生产者,该创建器将 99 条消息放在队列中。 我为具有不同会话的同一队列创建了一个消费者,并处理了和。 我没有确认第一条消息,但确认了其余98条消息。 我打开ActiveMQ管理控制台,预计会看到1条消息挂起/1条在队列中,但令我惊讶的是,我看到所有99条消息都已出列。 有人能指出我哪里出错了吗?

  • 我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个