当前位置: 首页 > 知识库问答 >
问题:

Spring集成-回复通道无响应

戚翰飞
2023-03-14

这是使用注释代码示例的Spring集成执行器通道的后续问题。

我试图通过在“公共频道”中发布一条消息并阅读味精中设置的REPLY_CHANNEL来测试用红色突出显示的框。

“公共通道”是发布-订阅通道。REPLY_通道是一个队列通道。

由于这是一个JUnit测试,我已经模拟了jdbcTemboard、数据源和Impl以忽略任何DB调用。

我的问题是:当我在“公共频道”上发布消息时,我在REPLY\u频道上没有收到任何消息。junit一直在等待响应。

我应该改变什么才能在REPLY_CHANNEL上得到回应?

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1 
@ActiveProfiles("test")
public class QueuetoQueueTest {
    @Configuration
    static class ContextConfiguration { ------------------------------------- 2
        @Bean(name = "jdbcTemplate")
        public JdbcTemplate jdbcTemplate() {
            JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
            return jdbcTemplateMock;
        }

        @Bean(name = "dataSource")
        public DataSource dataSource() {
            DataSource dataSourceMock = Mockito.mock(DataSource.class);
            return dataSourceMock;
        }

        @Bean(name = "entityManager")
        public EntityManager entityManager() {
            EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
            return entityManagerMock;
        }

        @Bean(name = "ResponseChannel")
        public QueueChannel getReplyQueueChannel() {
            return new QueueChannel();
        }

//This channel serves as the 'common channel' in the diagram
        @Bean(name = "processRequestSubscribableChannel")
        public MessageChannel getPublishSubscribeChannel() {
            return new PublishSubscribeChannel();
        }
    }

    @Mock
    DBStoreDaoImpl dbStoreDaoImpl;

    @Test
    public void testDBConnectivity() {
        Assert.assertTrue(true);
    }

    @InjectMocks -------------------------------------------------------------- 3
    StoretoDBConfig storetoDBConfig = new StoretoDBConfig();

    @Autowired
    @Qualifier("ResponseChannel")
    QueueChannel ResponseChannel;

    @Autowired
    @Qualifier("processRequestSubscribableChannel")
    MessageChannel processRequestSubscribableChannel;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void outboundtoQueueTest() {
        try {
            when(dbStoreDaoImpl.storeToDB(any()))
                    .thenReturn(1); ----------------------------------------------- 4

            //create message
            Message message = (Message<String>) MessageBuilder
                    .withPayload("Hello")
                    .setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
                    .build();

            //send message
            processRequestSubscribableChannel.send(message);
            System.out
                    .println("Listening on InstructionResponseHandlertoEventProcessorQueue");

            //wait for response on reply channel
            Message<?> response = ResponseChannel.receive(); ----------------------- 5
            System.out.println("***************RECEIVED: "
                    + response.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

>

  • 为JUnit加载“ContextConfiguration”,以便不访问数据库。

    这是如何在JUnit中按照https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles

    在config类中,我们模拟jdbcTemplate、dataSource、entityManager,并定义发布请求的“公共通道”和ResponseChannel。

    将jdbcTemplate、dataSource mock注入StoretoDBConfig,使数据库不会被击中

    模拟DaoImpl类,以便忽略DB调用

    测试块在这里,因为在REPLY\u通道上没有响应

    更新代码:

    Code inside 5 (the class that reads from common channel):
    
        @Configuration
        class HandleRequestConfig {
    
            //Common channel - refer diagram
            @Autowired
            PublishSubscribeChannel processRequestSubscribableChannel;
    
            //Step 9 - This channel is used to send queue to the downstream system
            @Autowired
            PublishSubscribeChannel forwardToExternalSystemQueue;
    
            public void handle() {
                IntegrationFlows.from("processRequestSubscribableChannel")          // Read from 'Common channel'
                .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
                .handle(RequestProcessor,"validateMessage")                         // Perform custom business logic - no logic for now, return the msg as is   
                .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
                .channel("forwardToExternalSystemQueue");                           // Post to 'Channel to another system' 
                }
    
        }
    
        //Code inside step 8 - 'Custom Business Logic' 
        @Configuration
        class RequestProcessor {
            public Message<?> validateMessage(Message<?> msg) {
                return msg;
            }
        }
    

    我试图达到的目标:我有单独的业务逻辑jUnit测试用例。我试图测试当请求发布到“公共通道”时,响应是在“另一个系统的通道”上接收到的。

    为什么我不能使用原始的ApplicationContext:因为它连接到DB,我不希望我的JUnit连接到DB或使用嵌入式数据库。我希望忽略对数据库的任何调用。

    我已经将回复通道设置为“ResponseChannel”,“自定义业务逻辑”不应该将其响应发送到“ResponseChannel”吗?

    如果我必须在不同的频道上收听响应,我愿意这样做。我只想测试我在“公共频道”上发送的消息是否在“其他系统的频道”上收到。

    更新2:回答阿特姆的问题。谢谢阿特姆的建议。

    测试配置中是否包含“HandlerRequestConfig”我们不能直接调用handle()方法。相反,我认为如果我在“processRequestSubscribableChannel”上发布,将调用HandlerRequestConfig中的handle()方法,因为它侦听同一个通道。这是错的吗?如何测试HandlerRequestConfig。handle()方法?

    我在HandleRequestConfig(代码更新)的每个步骤的末尾添加了wiretap。我发现没有打印任何窃听信息。这意味着我发布的消息甚至没有到达输入通道“processRequestSubscribableChannel”。我做错了什么?

    注意:我尝试删除配置中的“processRequestSubscribableChannel”bean(以便使用applicationContext中的实际“processRequestSubscribableChannel”)。我收到了一个不满意的依赖项错误-预计至少有1个配置PublishSubscribeChannel的bean。

    更新3:已发布请求的详细信息。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class QueuetoQueueTest  {
    
    //  Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
            @MockBean
            @Qualifier("jdbcTemplate")
            JdbcTemplate jdbcTemplate;
    
            @MockBean
            @Qualifier("dataSource")
            public DataSource dataSource;
    
            @MockBean
            @Qualifier("entityManager")
            public EntityManager entityManager;
    
            @Bean(name = "ResponseChannel")
            public PublishSubscribeChannel getReplyQueueChannel() {
                return new PublishSubscribeChannel();
            }
    
            //Mocking the DB class
            @MockBean
            @Qualifier("dbStoreDaoImpl")
            DBStoreDaoImpl  dbStoreDaoImpl ;
    
    
            //Inject the mock objects created above into the flow that stores data into the DB.
            @InjectMocks
            StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
    
    //Step 2 - Injecting MessageChannel used in the actual ApplicationContext
            @Autowired
            @Qualifier("processRequestSubscribableChannel")
            MessageChannel processRequestSubscribableChannel;
    
            @Before
            public void setUp() throws Exception {
                MockitoAnnotations.initMocks(this);
            }
    
            @Test
            public void outboundtoQueueTest() {
            try {
                when(dbStoreDaoImpl.storeToDB(any()))
                        .thenReturn(1);
    
                //create message
                Message message = (Message<?>) MessageBuilder
                        .withPayload("Hello")
                        .build();
                //send message - this channel is the actual channel used in ApplicationContext
                processRequestSubscribableChannel.send(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    更新1:StoretoDBConfig中的代码

    @Configuration
    @EnableIntegration
    public class StoretoDBConfig {
        @Autowired
        DataSource dataSource;
    
    /*
     * Below code is irrelevant to our current problem - Including for reference.
     * 
     * storing into DB is delegated to a separate thread.
     *
     *  @Bean
     *  public TaskExecutor taskExecutor() {
     *      return new SimpleAsyncTaskExecutor();
     *  }
     *  
     *  @Bean(name="executorChannelToDB")
     *  public ExecutorChannel outboundRequests() {
     *      return new ExecutorChannel(taskExecutor());
     *  }
     *  @Bean(name = "DBFailureChannel")
     *  public static MessageChannel getFailureChannel() {
     *      return new DirectChannel();
     *  }
     *  private static final Logger logger = Logger
     *              .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
    */
        @Bean
        public IntegrationFlow handle() {
        /*
         * Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
         *
         /
            return IntegrationFlows
                    .from("processRequestSubscribableChannel")
                    .channel("executorChannelToDB").get();
        }
    }
    

    存储在单独线程上的DB中的代码:

    @Repository
    public class DBStoreDaoImpl implements DBStoreDao {
        private JdbcTemplate jdbcTemplate;
    
        @Autowired
        public void setJdbcTemplate(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        @ServiceActivator(inputChannel = "executorChannelToDB")
        public void storetoDB(Message<?> msg) throws Exception {
                String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
                jdbcTemplate.update(insertQuery, msg.toString());
        }
    }
    
  • 共有1个答案

    夹谷承安
    2023-03-14

    请向我们展示订阅了该公共频道的内容。您的图表不知何故与您向我们展示的内容无关。您演示的代码未满。

    reportyChannel的真正问题是,某些东西确实必须向它发送消息。如果您的流只是单向的——发送、存储和没有返回,那么您确实不会为此获得任何东西。这就是为什么要显示这些通道适配器。

    观察消息过程的最佳方法是打开组织的调试日志记录。springframework。集成类别。

    虽然我看到您声明这些通道是在ContextConfiguration中的,并且实际上没有任何订阅者订阅getRequestChannel。因此,没有人会使用您的消息,当然,也没有人会向您发送回复。

    请重新考虑您的测试类,以使用真实的应用程序上下文。否则,如果你真的不测试你的流量,那么你想要实现什么是完全不清楚的。。。

     类似资料:
    • 我在我们的项目中引入了spring集成,而不是遗留集成架构。该体系结构支持发送者和累犯。每个发件人可以配置3个目的地。 null Spring integration gateways看起来很合适。我可以使用default-request-channel来表示主流,error-channel来表示失败流。备份流的问题。如何复制网关传入消息并将其放置到备份通道? 更准确地说,这里是一个测试和代码。

    • 但我得到的错误如下: POM: 我按以下方式配置入站网关: 并且,服务激活器: 顺便说一句,只有当我在服务激活器中删除outputChannel="outputChannel"时,它才有效。 这个问题有什么解释吗,我有什么误解吗?

    • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

    • 我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?

    • 可以在运行时向spring integration dsl注册MessageSources吗? 在我的例子中,我想创建多个FileReadingMessageSources(基于UI的输入),然后将有效负载发送到特定的通道/jms路由(从元数据或用户输入读取) 另一个问题是,是否可以动态注册IntegrationFlows?

    • 我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。 到目前为止我所拥有的: 预期的行为如下: 通过网关发送第一个请求 一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。 我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是: