//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE
package com.test.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import java.io.IOException;
@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
@SuppressWarnings("unused")
@Value("${tcp.connection.port}")
private int tcpPort;
@Bean
TcpConnectionEventListener customerTcpListener() {
return new TcpConnectionEventListener();
}
@Bean
public MessageChannel tcpIn() {
return new DirectChannel();
}
@Bean
public MessageChannel serviceChannel() {
return new DirectChannel();
}
@ConditionalOnMissingBean(name = "errorChannel")
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel exceptionEventChannel() {
return new DirectChannel();
}
@Bean
public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
return byteArrayLengthPrefixSerializer;
}
@Bean
public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
return tcpServerCf;
}
@Bean
public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(tcpNetServerConnectionFactory());
adapter.setOutputChannel(tcpIn());
adapter.setErrorChannel(exceptionEventChannel());
return adapter;
}
@ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
public String handle(Message<MessagingException> msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------EXCEPTION ==> " + msg);
return msg.toString();
}
@Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
public String transformer(String msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------ERROR ==> " + msg);
return msg.toString();
}
@ServiceActivator(inputChannel = "serviceChannel")
@Bean
public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(cf);
return tcpSendingMessageHandler;
}
@Bean
public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
return new ApplicationListener<TcpDeserializationExceptionEvent>() {
@Override
public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
.build());
}
};
}
}
@Component
public class TcpServiceActivator {
@Autowired
public TcpServiceActivator() {
}
@ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
public String service(byte[] byteMessage) {
// Business Logic returns String Ack Response
}
但是,当我试图模拟一个异常(例如反序列化异常)时,异常消息不会作为响应发送回Tcp客户端。我可以看到我的应用程序侦听器正在获取TcpDeserializationExceptionEvent,并将消息发送到ExceptionEventChannel。@ServiceActivator方法句柄(Message msg)也打印我的异常消息。但它从未到达MessageHandler方法out(AbstractServerConnectionFactory cf)内的断点(在调试模式下)。
我很难理解哪里出了问题。提前谢谢你的帮助。
更新:我注意到,在可以发送响应之前,套接字由于异常而关闭。我在想办法解决这个问题
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
boolean isValidMessage = false;
try {
int messageLength = this.readPrefix(inputStream);
if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
return this.copyToSizedArray(buffer, messageLength);
}
return EventType.MSG_INVALID.getName().getBytes();
} catch (SoftEndOfStreamException eose) {
return EventType.MSG_INVALID.getName().getBytes();
}
}
我的@路由器和“错误处理”@ServiceActivator-
@Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
public String messageRouter(byte[] byteMessage) {
String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("------------------> "+unfilteredMessage);
if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
return "errorChannel";
}
return "serviceChannel";
}
@ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
public String errorHandler(byte[] byteMessage) {
return Message.ACK_RETRY;
}
错误通道用于处理在处理消息时发生的异常。反序列化错误发生在创建消息之前(反序列化器对消息的有效负载进行解码)。
反序列化异常是致命的,正如您所观察到的,套接字是关闭的。
一种选择是在反序列化器中捕获异常并返回一个“特殊”值,该值指示发生了反序列化异常,然后在主线流中检查该值。
获取此错误: [错误] 1118#1118: *366 FastCGI在stderr中发送:" PHP消息:PHP致命错误:未捕获的异常' Predis \ Response \ server exception '以及消息' MOVED 7369 10.0.213.16:6379 ' 我的Redis连接码是: 注意:上面的代码在本地运行良好,但是在我的AWS服务器上不能运行。任何帮助都将不胜感激
我正在尝试开始登录ElasticSearch,但在服务器中找不到日志,如果启用诊断模式,将引发异常: 我的创业公司就是这样。cs看起来: 以下是我的rogram.cs 这是我在控制器里的东西: 我得到的例外是: 2021-08-19T21:09:15.2793348Z未能创建模板。弹性搜索。网ElasticsearchClientException:请求未能执行。调用:状态代码400来自:PUT/
我的spring boot应用程序就像一个中间人。它等待一个请求,然后格式化该请求并将其发送给服务器,并将服务器响应返回给请求发送方。然而,当我从服务器获得响应错误响应(例如,状态代码为400,错误请求)时,我想通过添加从服务器以JSON格式返回的错误原因来修改默认的spring boot JSON异常体。 来自服务器的响应: spring boot返回异常: 我想用服务器返回的“error_me
如何测试接收并返回
我正在尝试创建一个TCP服务器,该服务器在端口5002上接受来自外部程序的消息。但是,它不接收来自外部程序的消息。 为了验证我的TCP服务器是否正常工作,我像这样使用了telnet,程序确实收到了文本“hello”。 设置wireshark时,我可以看到计算机正在端口5002上接收来自外部程序(我期待)的消息。为什么我的程序无法接收这些消息? 关于最终解决方案的最新情况: 由于负载没有停止线,我必
我正在将本地netty服务器连接到远程https服务器以代理请求。 下面是我如何创建ssLcontext bean 当我点击我的本地主机时,它应该代理到后端。我得到以下异常。但是,如果SSL关闭并且我连接到远程,这是在不同端口上运行的本地服务器,则工作正常 编辑 添加的日志