当前位置: 首页 > 知识库问答 >
问题:

使用Akka持久邮箱进行事务消息处理

郎同化
2023-03-14

我有一个与持久JMS(ActiveMQ)队列集成的Java /Akka应用程序

PersistentQueue封装了一个包含批处理作业的JMS/ActiveMQ队列。在事务中接收消息,这样,如果服务器在作业执行过程中停机,则在重新启动时将保留作业。如果作业成功完成或被用户取消,则提交此事务以永久删除消息;如果作业失败,则如果作业执行的重试次数少于MAX\u,则回滚事务(将消息放在队列的前面)。

Batch Manager是与REST控制器的接口。由于作业执行期间调用的存储过程所施加的限制,它一次只能执行一个批处理作业。Batch Manager从控制器接收作业,并将其发送到要放入JMS队列的持久队列,然后在作业排队(除非另一个作业正在执行)或作业完成时,轮询持久队列以查找新作业。

我想删除JMS队列和处理其JMSEceptions的所有复杂性,并将其替换为用于Batch Manager的持久邮箱。问题是,我不知道如何使用持久邮箱复制JMS事务—我的理解是,如果服务器在作业执行过程中停机,则该消息将永远丢失(而不是放回JMS队列)。

有没有一种方法可以使用Akka持久邮箱进行事务性消息处理,以便在执行过程中服务器停机时不会丢失消息?

共有2个答案

丌官翰采
2023-03-14

您可以使用PeristentActor来跟踪已发布和已完成的作业来实现这一点。

分布式工作人员”激活器模板包含这样一个工作单元管理参与者。(以及动态工作人员注册和集群,但即使您不感兴趣,它可能仍然值得一看)。

Scala版本的模板和Java版本的模板

奚飞星
2023-03-14

Akka文件说明:

持久邮箱就像任何其他不太可能是事务性的邮箱一样。如果参与者在收到消息后但在完成处理之前崩溃,则消息可能会丢失。

但也有另一种类型的邮箱-带有显式确认的邮箱(又名PeekMailbox)。在这里您可以找到使用示例。这是实现的源代码。

我认为,通过实现自定义持久邮箱可以实现您的目标,该邮箱扩展了一些现有的实现,并使用PeekMailbox功能对其进行了扩展。

 类似资料:
  • 清理快照存储区中的旧快照很容易:在每次成功的快照之后,参与者都会收到一个,其中包含指示其序列号的元数据,该信息可以用于构造快照,然后被馈送到。 但是,对于持久化消息,没有与等效的方法。因此,不可能知道日志中“last-ish”消息的序号是什么。可以保留持久消息计数的本地缓存并对其进行快照,以用于调用,但这太繁琐了。 附言。当然,选择用于传递到的序号比上面提到的要复杂一点:即使有一种方法可以从日记中

  • 本文重点给大家介绍如何将邮箱和消息队列运用到实际项目中去。 本次任务 通过 ENV 工具获取 nrf24l01 软件包,并加载到 MDK 工程里面。 了解多线程间的通信,了解 IPC 中邮箱和消息队列的特性,并能灵活使用,实现 ds18b20 线程与 nrf24l01 线程之间的数据通信。 修改 nrf24l01 软件包,实现多点通信功能。 上述任务的重点,是要学习去灵活运用邮箱和消息队列。 软件

  • 我只想使用Akka演员作为邮箱,即我想创建n个线程,每个线程创建1个远程演员。 每个线程都获得对其他线程的所有远程参与者的引用,这样它们就可以通过各自的参与者向彼此发送消息。 参与者定义如下: 其中是对该线程的本地参与者的引用。 我有时会遇到上述模式的死锁(超时5秒),即使使用非常简单的方法和很小的消息也是如此。我将问题缩小到actors在收到消息后,但在输入doReceive(...)的第一个大

  • 在我从Jboss AS5迁移到Wildfly9应用程序的过程中,我试图在Wildfly:javax.transaction.api中加载一个模块。看Wildfly文章如何在WF8中加载模块,它说您可能不需要显式加载一些模块,因为它们是隐式加载的。 然后我在applictionContext.xml中定义它 在jboss-deployment-structure.xml中 但是我得到了这个错误: 谢

  • “ActiveMQ中Blob消息传递的持久性”? "我们不能使用数据库(KahaDB)来Blob消息URL吗?" “我们可以像在远程activemq服务器中一样在嵌入式代理中创建文件服务器吗?”

  • 消息应答 ack >[danger] noAck: false 手动接收消息模式 async consume() { const ch = await this.app.amqplib.createChannel(); await ch.assertQueue(queueName, { durable: false }); const msg = await new Pro