当前位置: 首页 > 知识库问答 >
问题:

Spring-Integration:异常时Tcp服务器响应未发送

席成仁
2023-03-14

//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE

package com.test.config;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;

import java.io.IOException;

@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
    @SuppressWarnings("unused")
    @Value("${tcp.connection.port}")
    private int tcpPort;

    @Bean
    TcpConnectionEventListener customerTcpListener() {
        return new TcpConnectionEventListener();
    }

    @Bean
    public MessageChannel tcpIn() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel serviceChannel() {
        return new DirectChannel();
    }

    @ConditionalOnMissingBean(name = "errorChannel")
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel exceptionEventChannel() {
        return new DirectChannel();
    }

    @Bean
    public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
        ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
        byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
        return byteArrayLengthPrefixSerializer;
    }

    @Bean
    public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
        tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
        tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
        return tcpServerCf;

    }

    @Bean
    public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(tcpNetServerConnectionFactory());
        adapter.setOutputChannel(tcpIn());
        adapter.setErrorChannel(exceptionEventChannel());
        return adapter;
    }

    @ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
    public String handle(Message<MessagingException> msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------EXCEPTION ==> " + msg);
        return msg.toString();
    }

    @Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
    public String transformer(String msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------ERROR ==> " + msg);
        return msg.toString();
    }


    @ServiceActivator(inputChannel = "serviceChannel")
    @Bean
    public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf);
        return tcpSendingMessageHandler;
    }


    @Bean
    public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
        return new ApplicationListener<TcpDeserializationExceptionEvent>() {

            @Override
            public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
                exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
                        .build());
            }

        };
    }
}
@Component
public class TcpServiceActivator {

    @Autowired
    public TcpServiceActivator() {
    }


    @ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
    public String service(byte[] byteMessage) {
     // Business Logic returns String Ack Response
    }

但是,当我试图模拟一个异常(例如反序列化异常)时,异常消息不会作为响应发送回Tcp客户端。我可以看到我的应用程序侦听器正在获取TcpDeserializationExceptionEvent,并将消息发送到ExceptionEventChannel。@ServiceActivator方法句柄(Message msg)也打印我的异常消息。但它从未到达MessageHandler方法out(AbstractServerConnectionFactory cf)内的断点(在调试模式下)。

我很难理解哪里出了问题。提前谢谢你的帮助。

更新:我注意到,在可以发送响应之前,套接字由于异常而关闭。我在想办法解决这个问题

    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
        boolean isValidMessage = false;
        try {
            int messageLength = this.readPrefix(inputStream);
            if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
                return this.copyToSizedArray(buffer, messageLength);
            }
            return EventType.MSG_INVALID.getName().getBytes(); 
        } catch (SoftEndOfStreamException eose) {
            return EventType.MSG_INVALID.getName().getBytes();
        }
    }

我的@路由器和“错误处理”@ServiceActivator-

    @Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
    public String messageRouter(byte[] byteMessage) {
        String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("------------------> "+unfilteredMessage);
        if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
            return "errorChannel";
        }
        return "serviceChannel";
    }


    @ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
    public String errorHandler(byte[] byteMessage) {
        return Message.ACK_RETRY;
    }

共有1个答案

沈骞仕
2023-03-14

错误通道用于处理在处理消息时发生的异常。反序列化错误发生在创建消息之前(反序列化器对消息的有效负载进行解码)。

反序列化异常是致命的,正如您所观察到的,套接字是关闭的。

一种选择是在反序列化器中捕获异常并返回一个“特殊”值,该值指示发生了反序列化异常,然后在主线流中检查该值。

 类似资料:
  • 获取此错误: [错误] 1118#1118: *366 FastCGI在stderr中发送:" PHP消息:PHP致命错误:未捕获的异常' Predis \ Response \ server exception '以及消息' MOVED 7369 10.0.213.16:6379 ' 我的Redis连接码是: 注意:上面的代码在本地运行良好,但是在我的AWS服务器上不能运行。任何帮助都将不胜感激

  • 我正在尝试开始登录ElasticSearch,但在服务器中找不到日志,如果启用诊断模式,将引发异常: 我的创业公司就是这样。cs看起来: 以下是我的rogram.cs 这是我在控制器里的东西: 我得到的例外是: 2021-08-19T21:09:15.2793348Z未能创建模板。弹性搜索。网ElasticsearchClientException:请求未能执行。调用:状态代码400来自:PUT/

  • 我的spring boot应用程序就像一个中间人。它等待一个请求,然后格式化该请求并将其发送给服务器,并将服务器响应返回给请求发送方。然而,当我从服务器获得响应错误响应(例如,状态代码为400,错误请求)时,我想通过添加从服务器以JSON格式返回的错误原因来修改默认的spring boot JSON异常体。 来自服务器的响应: spring boot返回异常: 我想用服务器返回的“error_me

  • 如何测试接收并返回

  • 我正在尝试创建一个TCP服务器,该服务器在端口5002上接受来自外部程序的消息。但是,它不接收来自外部程序的消息。 为了验证我的TCP服务器是否正常工作,我像这样使用了telnet,程序确实收到了文本“hello”。 设置wireshark时,我可以看到计算机正在端口5002上接收来自外部程序(我期待)的消息。为什么我的程序无法接收这些消息? 关于最终解决方案的最新情况: 由于负载没有停止线,我必

  • 我正在将本地netty服务器连接到远程https服务器以代理请求。 下面是我如何创建ssLcontext bean 当我点击我的本地主机时,它应该代理到后端。我得到以下异常。但是,如果SSL关闭并且我连接到远程,这是在不同端口上运行的本地服务器,则工作正常 编辑 添加的日志