当前位置: 首页 > 知识库问答 >
问题:

如何使用Java注释设置Spring Integration协作通道适配器?

拓拔嘉运
2023-03-14

我看过其他与我的问题相关的帖子,但没有一个答案帮助我解决了我的问题。

我试图遵循下面的示例:https://github.com/garyrussell/spring-integration-samples/tree/master/medition/tcp-client-server-multiplex

显然,我的理解是错误的,因为我没有看到响应如何返回到网关,而且它不起作用。

我可以看到套接字正在打开,但它在消息发送后立即被关闭,因此反序列化器返回一个EOF:null错误。

>

  • 是否设置了TcpReceivingChannelAdapter错误?

    @EnableIntegration
    @IntegrationComponentScan
    @Configuration
    public class TcpMultiPlexConfig implements ApplicationListener<TcpConnectionEvent> {
    
        protected final static Logger LOGGER = LoggerFactory.getLogger(TcpMultiPlexConfig.class);
    
        @Value("${engine.port}")
        private int port;// = 55001;
        @Value("${engine.address}")
        private String ipAddress;// = "192.168.1.1";
        @Value("${engine.timeout}")
        private int timeout;
    
        @Override
        public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
            TcpConnection source = (TcpConnection) tcpEvent.getSource();
            if (tcpEvent instanceof TcpConnectionOpenEvent) {
                LOGGER.info("********* Socket Opened " + source.getConnectionId());
            } else if (tcpEvent instanceof TcpConnectionCloseEvent) {
                LOGGER.info("*********** Socket Closed " + source.getConnectionId());
            }
        }
    
        @MessagingGateway(defaultRequestChannel="input")
        public interface MultiPlexGateway {
    
            String send(@Payload String in, @Header("CORRELATION_ID") String transactionId);
    
        }
        // TODO the request and response are being put together
        @Bean
        @ServiceActivator(inputChannel = "input")
        public BridgeHandler bridge() {
            BridgeHandler bridge = new BridgeHandler();
            bridge.setOutputChannelName("toAggregatorClient");
            bridge.setOrder(1);
            return bridge;
        }
    
        @Bean
        public PublishSubscribeChannel input() {
            return new PublishSubscribeChannel();
        }
    
        @Bean
        public DirectChannel toAggregatorClient() {
            return new DirectChannel();
        }
    
        @Bean
        public DirectChannel noResponseChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public DirectChannel toTransformerClient() {
            return new DirectChannel();
        }
    
        @Bean
        public TcpReceivingChannelAdapter inAdapterClient() {
            TcpReceivingChannelAdapter receivingAdapter = new TcpReceivingChannelAdapter();
            receivingAdapter.setConnectionFactory(clientConnectionFactory());
            receivingAdapter.setOutputChannel(toAggregatorClient());
            receivingAdapter.setClientMode(true);
            return receivingAdapter;
        }
    
    
        @Bean
        @ServiceActivator(inputChannel = "input")
        public TcpSendingMessageHandler outAdapterClient() {
            TcpSendingMessageHandler outAdapter = new TcpSendingMessageHandler();
            outAdapter.setOrder(2);
            outAdapter.setConnectionFactory(clientConnectionFactory());
            outAdapter.setClientMode(true);
            return outAdapter;
        }
    
        @Bean(name ="clientCFMP")
        public AbstractClientConnectionFactory clientConnectionFactory() {
            TcpNetClientConnectionFactory tcp = new TcpNetClientConnectionFactory(this.ipAddress , this.port);
            tcp.setSerializer(new DefaultSerializer()); // out
    //      byte delimeter = "\n".getBytes()[0];
    //      ElasticByteArrayRawSingleTerminatorSerializer deserializer = new ElasticByteArrayRawSingleTerminatorSerializer(delimeter);
    //      DefaultDeserializer deserializer = new DefaultDeserializer();
            MyDefaultDeserializer deserializer = new MyDefaultDeserializer();
            tcp.setDeserializer(deserializer);
    
            tcp.setSoTimeout(timeout);
            tcp.setSingleUse(false);
            MapMessageConverter mc = new MapMessageConverter();
            mc.setHeaderNames("CORRELATION_ID");
            tcp.setMapper(new MessageConvertingTcpMessageMapper(mc));
    
            return tcp;
        }
    
    
        @MessageEndpoint
        public static class MyConverters {
    
            @Transformer(inputChannel="toTransformerClient", outputChannel = "resultToString")
            public byte[] getResponse(MessageGroup payload) {
    //          byte[] result = null;
                List<Message<?>>list = new ArrayList<>(payload.getMessages());
                byte[] result = (byte[]) list.get(1).getPayload();
    //          LOGGER.info(result);
                return result;
            }
    
            @Transformer(inputChannel="resultToString")
            public String convertResult(byte[] bytes) {
                String result = new String(bytes);
                LOGGER.info("*********** RESULT => " + result);
                return result;
            }
    
            @ServiceActivator(inputChannel = "noResponseChannel")
            public MessageTimeoutException  noResponse(String input) {
                throw new MessageTimeoutException("****** No response received for => " + input);
            }
    
        }
    
    
    
        @Bean
        @ServiceActivator(inputChannel = "toAggregatorClient", outputChannel = "toTransformerClient")
        public FactoryBean<MessageHandler>  aggregatorFactoryBean() {
            AggregatorFactoryBean  afb = new AggregatorFactoryBean ();
            afb.setExpireGroupsUponCompletion(true);
            afb.setExpireGroupsUponTimeout(true);
            afb.setGroupTimeoutExpression(new ValueExpression<>(this.timeout));
            afb.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy("CORRELATION_ID"));
            afb.setReleaseStrategy(new MessageCountReleaseStrategy(2));
            afb.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
            afb.setSendPartialResultOnExpiry(false);
            afb.setMessageStore(new SimpleMessageStore());
            afb.setDiscardChannel(noResponseChannel());
            return afb;
        }
    
    @Service
    public class MultiPlexGatewayTransmission <T extends EngineData> extends AbstractMultiPlexEngineTransmission {
    
        public MultiPlexGatewayTransmission(MultiPlexGateway gateway) {
            super(gateway);
        }
    
        @Override
        public T request(EngineData request, Class<? extends EngineData> clazz) {
            String response = gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
            gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
            if(response == null || response.isEmpty()) {
                return null;
            }
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("MPGateway response ::: " + response.trim());
            }
    
            @SuppressWarnings("unchecked")
            T clientResponse = (T) JaxbUtils.unmarshall(response, clazz);
            if (LOGGER.isDebugEnabled()) {
    //          LOGGER.debug("*** Unmarshall response ::: " + clientResponse);
            }
            return clientResponse;
        }
    

    测试用例:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ActiveProfiles("test")
    public class ITGetClientsTest extends AbstractEngineTest {
    
        private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    
    //  @Autowired
    //  private GatewayTransmission<ClientsResponse> transmission;
    
        @Autowired
        private MultiPlexGatewayTransmission<ClientsResponse> transmission;
    
        @Test
        public void testGetClients() {
            LOGGER.info("Gateway test testGetClients... ");
    
            Api api = new Api();
            api.setIp("192.168.1.1");
            api.setMessageId(UUID.randomUUID().toString());
            api.setVersion("1.0");      
            api.setUserToken(token);
    
            ClientsRequest request = new ClientsRequest();
            request.setApi(api);
    
            ClientsResponse response = (ClientsResponse) transmission.request(request, ClientsResponse.class);
            Assert.assertTrue(response != null);
            Assert.assertTrue(!response.getClient().isEmpty());
    
            LOGGER.info(Arrays.deepToString(response.getClient().toArray()));
        }
    
    
    
    }
    
  • 共有1个答案

    曹景铄
    2023-03-14

    我没有详细看你的代码;现在已经很晚了,而且是一个周末,但是请参阅下面的答案,了解使用入站/出站连接ID来关联请求/应答的更简单的技术。

     类似资料:
    • 如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。

    • 我发现了一个xml配置的入站适配器示例,但我并不完全理解。配置指定REST请求设置请求方法、使用的格式等。 我认为,从Spring集成的角度来看,响应应该更加重要,因为响应实际上是为消息通道提供信息的。我说得对吗? HTTP入站适配器用作消息endpoint(实际上是消息起始点),它调用HTTP请求,例如REST服务的URL。”http://myRest/transfer/next“-向SI消息通

    • 我正在重构一个传统的基于Spring Batch XML的应用程序,以使用注释配置。我想了解如何将以下XML文件转换为基于注释的配置,并保持相同的关注分离。 为了便于讨论,这里有一个简单的例子。 job-config-1.xml job-config-2.xml job-config-3。xml 我想从XML配置转移到Java配置。我想为每个XML创建3个作业配置类。比如说JobConfig1。j

    • 我想根据“配置文件”设置注释的值。 让我举个例子来解释; 在上面的例子中,我们可以看到活动的“配置文件”是PROD,但是假设我们想要使用DEV配置文件,我们将不得不注释来自PROD的@Table注释,并取消注释DEV@Table注释。 如果这只针对一个实体,那不会是一个问题,但我有很多实体都有这种情况,所以我不认为这是处理这种即兴“简介”的方式。 你知道有什么办法可以解决这种情况吗?

    • 问题内容: 为Java 6注释处理器设置Eclipse项目编译器配置的最佳方法是什么? 我的解决方案是手动设置和文件。这有点麻烦: 在factorypath文件中引用处理器jar 配置蚀注解处理器输出目录在属性) 将Eclipse注释处理器输出目录添加为源文件夹 一个问题是,Eclipse生成的源将使用maven进行编译。Only 是可靠的,因为它删除了Eclipse生成的源文件。(Eclipse

    • 我有两个片段:(1)图书馆片段,(2)书片段 图书馆碎片通过RecyclerView显示所有可用的书籍。用户可以在每个RecyclerView项目上设置标签,这将把LiveData设置为相应的图书。同时,书籍片段将被打开,并显示该书的内容。 我在ViewHolder类中设置了一个onClickListener,它位于图书馆片段的RecyclerView. Adapter中。因此,当单击一个项目时,