我正在用Java实现Akka-Alpakka,用于从ActiveMQ队列消费和生成队列。我可以成功地从队列中消费,但还不能实现应用程序级消息确认。
我的目标是使用队列中的消息,并将它们发送给另一个参与者进行处理。当该角色完成处理后,我希望它能够控制ActiveMQ中的消息确认。这大概是通过向另一个可以进行确认的参与者发送消息、调用消息本身的确认函数或其他方式来完成的。
在我的测试中,两条消息被放入AlpakkaTest队列,然后这段代码尝试使用并确认它们。然而,我看不到将ActiveMQ会话设置为CLIENT_ACKNOWLEDGE的方法,也看不到调用<code>m.acknowle()与不调用的行为有任何区别 。因此,我认为消息仍然是自动确认的。
有谁知道在爪哇阿卡系统中为CLIENT_ACKNOWLEDGE配置 ActiveMQ 会话并使用阿尔帕卡手动确认 ActiveMQ 消息的公认方法吗?
相关测试功能是:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.
Source<Message, NotUsed> jmsSource = JmsSource.create(
JmsSourceSettings.create(connectionFactory)
.withQueue("AlpakkaTest")
.withBufferSize(2)
);
Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.
try {
List<Message> messages = jmsSource
.take(2)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(4, TimeUnit.SECONDS);
for(Message m:messages) {
System.out.println("Found Message ID: " + m.getJMSMessageID());
try {
m.acknowledge();
} catch(JMSException jmsException) {
System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
} catch (TimeoutException e1) {
e1.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
此代码打印:
Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1
查看Alpakka JmsSourceStage的源代码,它已经为您确认了每个传入消息(它的会话是客户端确认会话)。从源代码中我可以看出,没有允许您确认消息的模式。
您可以在此处查看阿尔帕卡的源代码。
更新:自 Alpakka 0.15 起,确认模式可在 JMS 连接器中配置。从链接的文档:
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
.create(connectionFactory)
.withQueue("test")
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
);
CompletionStage<List<String>> result = jmsSource
.take(msgsIn.size())
.map(message -> {
String text = ((ActiveMQTextMessage)message).getText();
message.acknowledge();
return text;
})
.runWith(Sink.seq(), materializer);
从版本0.11开始,Alpakka的JMS连接器不支持应用程序级消息确认。Alpakka在内部使用<code>CLIENT_ACKNOWLEDGE<code>模式创建<code>会话
有一个开放的票证讨论了启用基于队列的源的下游确认,但该票证已停用一段时间。
目前,您不能阻止Alpakka在JMS级别确认消息。但是,这并不妨碍您向流中添加一个阶段,将每条消息发送给一个参与者进行处理,并将参与者的回复用作背压信号。Akka Streams文档描述了如何使用< code>mapAsync和< code>ask的组合或< code > sink . actorrefwithack 来完成此操作。例如,使用前者:
Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS);
jmsSource
.mapAsync(2, msg -> ask(processorActor, msg, askTimeout))
.runWith(Sink.seq(), materializer);
(旁注:在相关的Streamz项目中,有一个最近开放的票证,允许应用程序级确认。Streamz是旧的akka-camel模块的替代品,和Alpartka一样,是在Akka Streams上构建的。Streamz还有一个JavaAPI,作为外部连接器列在Alpartka留档中。)
我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多
以前我读取队列中的所有消息,但现在我必须根据用户的选择返回特定数量的消息(计数)。 我试着相应地改变for循环,但是由于自动应答,它读取了所有的消息。所以我尝试在配置文件中将它改为手动。 在我的程序中,如何在读取 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收,我没有频道的参考)? 任何帮助都是非常值得赞赏的,提前表示感谢。
我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执
如果在ActiveMQ中创建了一个队列,该队列具有一个生产者(即客户端确认模式)和一个侦听器,在成功处理消息后,其onMessage方法中仅对其进行确认。假设存在异常,并且消息未被确认,因此仍在队列中。是否会再次发送给消费者?或者这些信息会发生什么?
我创建了一个具有事务 和 的生产者,该创建器将 99 条消息放在队列中。 我为具有不同会话的同一队列创建了一个消费者,并处理了和。 我没有确认第一条消息,但确认了其余98条消息。 我打开ActiveMQ管理控制台,预计会看到1条消息挂起/1条在队列中,但令我惊讶的是,我看到所有99条消息都已出列。 有人能指出我哪里出错了吗?
我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个