当前位置: 首页 > 知识库问答 >
问题:

spring jmsListener侦听多个队列

周越泽
2023-03-14

在这篇文章中,加里·拉塞尔解释了如何通过编程创建多个Kafka列斯汀来聆听多个主题。。Kafka·斯普林:如何动态或在循环中创建监听器?

现在我想有一个类似的设置也适用于JMSListeners-在那里我可以有一个带有一个@JMSListener的类,并且我可以以编程方式创建该JMSListener的多个实例,每个实例都注入了自己的queueName。

我发现这篇文章Spring JMS开始根据请求监听jms队列

在这篇帖子的最后,Gary发表了类似的评论,

如果你想动态地创建大量的容器,那么只需以编程方式创建容器,调用后属性集(),然后启动()

我使用了上面第一篇文章中的设置(与KafkaListeners相关),我的JMS侦听器的多个实例正在启动,但没有使用任何消息。

基本上我不知道我在哪里做这个

然后只需以编程方式创建容器,调用AfterPropertieSet(),然后启动()

我对容器这个词感到困惑,我知道有JMSListener,也有JmsListenerContainerFactory,在这个上下文中,容器是什么?我想是JMSListener?

我已经确认队列中有消息。此外,当我不以编程方式创建侦听器并且只有一个带有硬编码队列的侦听器时,它会消耗消息罚款。

当我以编程方式创建多个JMS Listeners时,基本上没有一个侦听器正在使用消息

    @SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
    private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
    private static Consumers consumersStatic;

    @Autowired
    Consumers consumers;

    @PostConstruct
    public void init() {
        consumersStatic = this.consumers;
    }

    @Bean
    public Gson gson() {
        return new Gson();
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
        List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
        Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
        logger.debug("queueInformationList ************" + queueInformationList.toString());
        for (QueueInformation queueInformation : queueInformationList) {
            AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
            child.setParent(context);
            child.register(MQConfig.class);
            Properties props = new Properties();
            props.setProperty("mqQueueName", queueInformation.getMqQueueName());
            //
            PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
            child.getEnvironment().getPropertySources().addLast(pps);
            child.refresh();
        }
    }
}

以下是包含listenerContainerFactory的MQConfig

@Configuration
public class MQConfig {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${ibm.mq.user}")
    private String mqUserName;

    @Bean
    public MQListener listener() {
        return new MQListener();
    }

    @PostConstruct
    public void afterConstruct() {
        logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        // Put the MQ username in the PCF environment.
        // Otherwise, the connection is identified by PCF's default user, "VCAP"
        System.setProperty("user.name", mqUserName);
        return factory;
    }
}

然后是MQListener,它具有实际的@JMSListener

    public class MQListener {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${mqQueueName}")
    private String mqQueueName;

    @PostConstruct
    public void afteConstruct() {
        logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);

    }

    @JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
    public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
        logger.debug("***********************************************receivedMessage:" + receivedMessage);
    }
}

这是我的申请表。yml公司

    ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
  queueInformationList:
  -
    mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
  -
    mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD

共有1个答案

秦晋
2023-03-14

好吧,我找到了另一个帖子,Gary回答了我正在寻找的添加动态听众数量(Spring JMS)

基本上这是我的工作解决方案。干得好@GaryRussell-我现在是粉丝了:)

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
        int i = 0;
        for (QueueInformation queueInformation :
                queueInformationList) {
            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId("myJmsEndpoint-" + i++);
            endpoint.setDestination(queueInformation.getMqQueueName());
            endpoint.setMessageListener(message -> {
                logger.debug("***********************************************receivedMessage:" + message);
            });
            registrar.registerEndpoint(endpoint);
            logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
  }
}

参见https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms-annotated-programmatic-registration

 类似资料:
  • 问题内容: 我正在使用EJB 3.1,并且想配置一个MDB来侦听多个队列。 我更喜欢通过XML定义队列名称,而其他通过注释定义。 能做到吗? 问题答案: 实例化后,MDB只能侦听在其目标ActivationConfigProperty中指定的资源,但是您 可以 为同一MDB创建具有不同目标的多个实例(在您的情况下为队列)。 在ejb-jar.xml中创建两个条目,它们具有不同的目的地和ejb-na

  • 服务器部件: 客户部分:io.js 消息组件 信息形式——发布过程的开始

  • 我刚接触Spring boot,我正在玩弄它。目前,我已经构建了一些应用程序,希望能够通过队列相互通信。我目前有一个侦听器对象,可以从特定队列接收消息。 这有效。但是,现在我希望能够监听另一个队列。所以我想我会复制上面的对象并更改队列名称。不幸的是,这不起作用,因为Spring Boot只为其中一个创建连接。关于如何让我的Spring Boot应用程序监听多个队列的任何想法?

  • 我是JComboBox的新手 我有4个JComboBox:专用、etudiant、annee和semestre。 每次更改所选项目并将结果添加到滚动窗格(groupe des matieres ouvertes)时,我都需要从其中的4个项目中获取所选项目

  • 我有三个SQS FIFO队列,其中每个队列在EC2实例中都有一个数据投影侦听器守护进程,作为docker pod(SQL Server、PostgreSQL、Elastic Search等) 所有队列的设置如下(死信队列稍后设置)。 这都是我使用DynamoDB流设计的事件源架构的一部分= 启用以避免队列中的重复消息,因为Lambda路由器中的任何队列总是可能出现错误。 现在,我还将每条消息的设置

  • 我试图让队列在laravel 5中工作,队列侦听器正在输出: 未定义索引:表 存在"作业"和"failed_jobs"表,config.php设置为"数据库"。 搜索laravel论坛和google都没有找到解决办法,艾米的想法去哪里找?