当前位置: 首页 > 知识库问答 >
问题:

Spring JTA Atomikos仅消耗7条消息,所有消息都已退出队列

勾起运
2023-03-14

我有一个简单的Spring boot应用程序,它基于Atomikos事务管理器的JTA。它使用队列中的消息并记录它们。问题是,在第7条传入消息之后,队列中的其余消息将被退出队列,但不会被处理。我意识到这种表现是循环的,我的意思是:

  1. 在队列中插入了10条消息
  2. 已处理并退出第一条消息的队列
  3. 已处理并退出第二条消息的队列
  4. 已处理并退出第三条消息的队列
  5. 已处理并退出第四条消息的队列
  6. 已处理并退出第5条消息的队列
  7. 已处理并退出第6条消息的队列
  8. 已处理第7条消息,并将第7条、第8条、第9条和第10条消息退出队列
  9. 在队列中插入了其他10条消息
  10. 已处理并退出第11条消息的队列
  11. 已处理并退出第12条消息的队列
  12. 已处理并退出第13条消息的队列
  13. 已处理并退出第14条消息的队列
  14. 已处理并退出第15条消息的队列
  15. 已处理并退出第16条消息的队列
  16. 处理第17条消息,并将第17条、第18条、第19条和第20条消息出列

此外,我意识到,如果没有启用事务管理器,它将处理并消耗队列中的所有消息。

是否有任何错误或配置我忘记了这一点?

您可以在以下网站上找到完整代码:https://github.com/PedroRamirezTOR/spring-jta-amq.git

共有1个答案

甄永年
2023-03-14

谢谢贾斯汀!最后,我准备了一个简单的例子来测试这个问题,我意识到只有在使用ActiveMQ控制台插入消息时才会出现这种情况。我准备了一个REST api,在一个事务中插入多条消息,所有消息都被消耗并退出队列。我认为这可能是ActiveMQ web控制台的错误。

如果有人感兴趣,可以从https://github.com/PedroRamirezTOR/spring-jta-amq.git下载这个例子

 类似资料:
  • 我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。

  • 我已经使用手动触发器创建了一个azure服务总线和一个新的逻辑应用程序。然后,我将“从队列获取消息(peek lock)”操作添加到应用程序中,并将最大消息数设置为“20”。 然后我在我的队列中手动创建5条新消息,然后触发我的新逻辑应用程序。然后,当我查看应用程序的执行情况时,我只看到检索到一条消息(并检查,有4条消息仍在我的队列中)。 似乎“20”的计数没有得到尊重。我还检查了我的服务总线队列的

  • 为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享

  • 一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 参考资料 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 发布/订阅 消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。 发布与订阅模式和观察者模式有以下不同: 观察者模式中,观察者和主题都知道对方的存在;

  • 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: *

  • 消息队列接口 结构体 struct   rt_messagequeue   消息队列控制块 更多...   类型定义 typedef struct rt_messagequeue *  rt_mq_t   消息队列类型指针定义   函数 rt_err_t  rt_mq_init (rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_