当前位置: 首页 > 知识库问答 >
问题:

Spring Boot和BlockingQueue侦听器

南宫云
2023-03-14

我已经用spring boot实现了jms,我正在使用@JmsListener来听这个主题

  @Component
    public class AMQListner {
        BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
        @JmsListener(destination = "${spring.activemq.topic}")
        public void Consume(TextMessage message) {
            try {
                String json = message.getText();
                MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                queue.add(bo);
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (JsonParseException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

现在,我需要一个侦听器来侦听阻塞队列,如果它有值,则进行处理。我们可以在spring boot中使用注释来实现这一点吗?

共有1个答案

郭乐湛
2023-03-14

首先,正确的方法是创建处理程序bean,而不是在接收方类中创建消息队列成员。

public interface MessageHandler extends Consumer<MessageBO> {
    public default void handle(MessageBO msg) { accept(msg); }
}

@Component
public class AMQListener {
    @Resource("multiplexer")
    MessageHandler handler;

    @JmsListener(destination = "${spring.activemq.topic}")
    public void Consume(TextMessage message) {
        try {
            String json = message.getText();
            MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
            handler.handle(bo);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

然后,您将在处理程序bean中拥有队列

@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
    @Autowired
    MessageHandler actualConsumer;

    ExecutorService executor = Executors.newFixedThreadPool(4);
    public void accept(MessageBO msg) {
        executor.submit(msg -> actualConsumer.handle(msg));
    }
}

在这种情况下,执行者几乎就是队列。

警告:您没有这样的1024限制。您可以通过使用ThreadPoolExecutor构造函数并向其传递一个有限的队列来实现这一点。

 类似资料:
  • 我们有web服务(jaxws),它正在调用另一个web服务(aslo jaxws)。Jaxws客户端配置如下所示-- 正如您所看到的,我们有两个拦截器和一个故障侦听器。我们希望在这些拦截器、故障侦听器和web服务代码之间进行通信。正如SO线程中所述,我们使用cxf交换对象在web服务和拦截器之间进行通信。 我们的inFaultInterceptor代码如下所示-- } Web服务代码如下所示- 但

  • 问题内容: 我有这个代码 尽管我仍未使用userdata,但事实是,每当我单击切换按钮时,我都会获得升序的所需输出。 这是输出: 在第一次单击中,我什么也没得到。 从第二次单击开始,我开始获得这种输出。谁能解释这种行为并为我提供解决方案? 问题答案: 该方法中的所有代码都应转到控制器的方法中。 现在,在第一个切换上单击,为设置数据,然后将它们放入组中,然后在上设置侦听器。因此,仅在第二次切换单击时

  • 我有一个使用ITopic的多节点集群Hazelcast应用程序。我试图了解,为了在节点崩溃时正确“清理”事情,我的应用程序是否应该检测节点崩溃并删除该节点的注册ID,或者Hazelcast是否会自动处理该问题。 我所说的“节点崩溃”是指Hazelcast集群中的应用程序在没有调用ITopic的情况下意外终止。removeMessageListener或HazelcastInstance。关闭。这可

  • 问题内容: 我当时在上网,但找不到很好的信息。我试图在每次运行应用程序时检测按键。我正在使用JavaFX并将其与FXML一起运行。我尝试了很多事情,但没有任何效果。请帮我。 问题答案: 您应该签出Ensemble示例。这是关键的侦听器代码。

  • 我正在使用Realex Payments的HPP API开发一个卡支付页面,其中包含一个iFrame,用于托管Realex页面。在Realex请求表单上,我将字段HPP_POST_维度和HPP_POST_响应设置为我的URL,如下所示: 付款页: www.example.com/account/payment.html 隐藏字段值用于在HPP页面大小更改和事务完成时,使用事件侦听器将数据从Real

  • 虽然计算属性在大多数情况下更合适,但有时也需要一个自定义的侦听器。这就是为什么 Vue 通过watch选项提供了一个更通用的方法,来响应数据的变化。当需要在数据变化时执行异步或开销较大的操作时,这个方式是最有用的。例如: <div id="watch-example"> <p> Ask a yes/no question: <input v-model="question">