当前位置: 首页 > 知识库问答 >
问题:

通过SpringIntegration入站适配器接收JMS消息会随机失败

丌官和泰
2023-03-14

我是这个Spring集成和JMS的新手,我开始使用它。在这里,我想通过activemq创建普通的jms消息,并通过spring inbound适配器(消息驱动)接收它。

以下是我的spring配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream
        http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd>
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

<!-- jms beans -->
<beans:bean id="jms.msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <beans:constructor-arg value="MSG_QUEUE" />
</beans:bean>

<beans:bean name="jms.connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <beans:property name="brokerURL" value="tcp://localhost:61616" />
</beans:bean>


 <!-- spring integration beans -->
<channel id="channels.jms.allMessages">
    <queue capacity="1000" />
</channel>

<jms:message-driven-channel-adapter id="adapters.jms.msgAdapter"
            connection-factory="jms.connectionFactory"
            destination="jms.msgQueue"
             channel="channels.jms.allMessages" />

这是我的测试课。

package com.bst.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.core.PollableChannel;

public class TestActiveMQ {

public static void main(String[] args){

    try{
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("app-context.xml");
        ConnectionFactory connectionFactory = (ConnectionFactory)context.getBean("jms.connectionFactory");
        Destination destination = (Destination)context.getBean("jms.msgQueue");
        PollableChannel msgChannel = (PollableChannel) context.getBean("channels.jms.allMessages", PollableChannel.class );

        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


        MessageProducer producer = session.createProducer(destination);

         TextMessage textMessage = session.createTextMessage();
         textMessage.setText("Message from JMS");
         producer.send(textMessage);


         System.out.println("--------------- Message Sending ------------------------");

         Message<?> received = msgChannel.receive();
         String payload = (String) received.getPayload();

         System.out.println("Receving message = " + payload);
    }catch(JMSException ex){
        System.out.println("----------- JMS Exception --------------");
    }

}

}

但问题是我不能保证交货。有些时候程序不能接收消息,有些时候它成功了,但有一些警告,如

Setup of JMS message listener invoker failed for destination 'queue://MSG_QUEUE' -   trying to recover. Cause: Connection reset
Could not refresh JMS Connection for destination 'queue://MSG_QUEUE' - retrying in 5000 ms. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect

无法刷新目标“queue://MSG_QUEUE”的JMS连接,将在5000毫秒后重试,原因:无法连接到代理URL: tcp://localhost:61616。原因:Java . net . connect异常:连接被拒绝:连接

这在成功之前会发生几次。

你们对此有什么想法吗?

感谢您的帮助。

谢谢,凯斯

共有2个答案

樊胜
2023-03-14

我已经在我的STS中测试了这些代码,运行良好。

您这边唯一的问题是,首先启动MessageBroker(比如ActiveMQ),然后运行项目,您可以获得所需的输出。

谢谢。

唐利
2023-03-14

这只意味着侦听器容器启动时代理没有运行。使用tcp://URL时,在创建此上下文之前,应在其自己的上下文(或另一个JVM)中运行代理。

 类似资料:
  • 这就是我的配置 这个想法是每3秒轮询一个目录,并根据通道向调度程序发送3条消息,以允许异步执行。然后根据消息数量聚合消息,然后发送到下一个服务激活器。第一个服务激活器将文件放在源目录中,第二个服务激活器获取聚合列表以将这些文件移动到暂存目录。 似乎发生的情况是,源文件夹跳过了一些文件,但临时文件夹确实获取了所有文件。我的猜测是,轮询器将消息发送到dispatcher通道,但当其线程池变满时,它会忽

  • 我正在试验Spring Webflux和Spring集成,以从JMS队列创建反应流(Flux)。 我试图从JMS队列(使用Spring集成的IBM MQ)创建一个反应流(Spring Webflux),以便客户端异步获取JMS消息。我相信我已经把一切都正确地连接起来了,因为这些信息正被被动的听众所消耗。然而,我的反应流量流无法显示这些消息。任何帮助都将不胜感激。 这是我用来使我的JMS侦听器响应的

  • 我有一个,我使用它向kafka发送消息,然后使用接收消息。通信似乎工作正常,我能够发送和接收消息,但格式有点奇怪。我单独向我的出站适配器发送单个消息,但当我收到消息时,我会收到一条消息,所有消息都聚合到该消息的有效负载中。 这就是我收到消息时消息负载的样子 [有效负载={mytopic={0=[字符串消息1,字符串消息2,字符串消息3,字符串消息4,字符串消息5,…]}},标头={id=3934d

  • 我目前正在一个基于Spring集成(3.0.1.RELEASE版)的应用程序上实现一个流,该应用程序需要将消息存储在JMS队列上,以便稍后提取。为此,我一直在尝试使用带有自定义选择器的Spring Integration JMS入站通道适配器,然后通过将JMSDestinationPollingSource的JMS选择器更改为作为头属性包含的某个匹配ID,从队列中提取消息。 其中一个要求是,我无法

  • 我正在从事一个spring批处理项目,其用例是:spring批处理作业依赖于SFTP服务器(远程目录)上的文件,因此,一旦文件在SFTP服务器上可用,相应的作业(spring批处理)就应该启动。此外,我不想先开始工作,然后再查找文件,因为这将是基于时间的方法,而不是基于通知的方法。所以我想使用spring集成(sftp入站通道适配器)。作为入站适配器(SFTP)的一部分,一旦我在SFTP服务器的远