当前位置: 首页 > 知识库问答 >
问题:

Spring集成超时客户端

步兴德
2023-03-14

我对Spring集成的设想是:

  1. 使用自定义协议(大小和内容)发送数据的十个生产者
  2. 我必须解码这个自定义协议,然后处理结果。

所以我尝试了很多配置,目前最好的配置如下:

<bean id="serializer" class="com.MySerializerDeserializer" />
<task:executor id="myTaskExecutor" pool-size="5-300" queue-capacity="500000"/>
<int-ip:tcp-connection-factory id="serverTcpConFact"
    type="server"
    port="5566"
    using-nio="true"
    single-use="false"
    so-timeout="5000"
    task-executor="myTaskExecutor"
    deserializer="serializer" 
    serializer="serializer"/>

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
    channel="tcpInbound"
    connection-factory="serverTcpConFact" />

<int:channel id="tcpInbound" />

<int:service-activator input-channel="tcpInbound"
    ref="importService"
    method="handler" />

<bean id="importService" class="com.MyImportService" />

序列化类为:

public class MySerializerDeserializer implements Serializer< MyMessage >, Deserializer< MyMessage > {

    @Override
    public MyMessage deserialize(InputStream inputStream) throws IOException {
        DataInputStream dis = new DataInputStream(inputStream);
        int size = dis.readInt();
        byte[] b = new byte[size];
        dis.read(b);
        MyMessage s = new MyMessage ();
        String value = new String(b);
        s.setTest(value);
        return s;

我使用此代码来测试服务器:

class SimpleThread extends Thread {
    public void run() {
        try {
            String id = java.util.UUID.randomUUID().toString();
            try (Socket echoSocket = new Socket("localhost", 5566)) {
                DataOutputStream dos = new DataOutputStream(echoSocket.getOutputStream());
                for (int i = 0; i < 500; i++) {
                    String s = id + " " + i;
               dos.writeInt(s.length());
                    dos.write(s.getBytes());
                    dos.flush();
                    System.out.println(id + " - " + i + " - " + s.length());
                }
            }

当我用一个线程执行此操作时,如果我尝试执行多个线程,则效果很好,如:

  for (int i = 0; i < 5; i++) {
 new SimpleThread().start();
 }

spring集成服务器卡住了,我有以下警告:

WARN _[m [THREAD ID=myTaskExecutor-1] 2014-06-25 12:42:18 WARN  org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory:566 - Timing out TcpNioConnection 127.0.0.1:56273:5566:4e3caf61-1101-4881-a1cf-1c31610b33f3

而且它不工作,服务器无法接收消息。

我错在哪里?非常感谢。

编辑

我这样修改了线程轮询:

<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="50000" />
            <property name="rejectedExecutionHandler">
                <bean class="org.springframework.integration.util.CallerBlocksPolicy">
                    <constructor-arg value="10000" />
                </bean>
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="50000" />
            <property name="rejectedExecutionHandler">
                <bean class="org.springframework.integration.util.CallerBlocksPolicy">
                    <constructor-arg value="10000" />
                </bean>
            </property>
        </bean>
    </constructor-arg>
</bean>

服务器更有响应性,但我仍然有问题

**编辑*****

导入服务是:

public class ImportService {

    public void handler(MyMessage inp) {
        System.out.println(Thread.currentThread().getName() + "******" + inp.getTest());

    }

共有2个答案

吕征
2023-03-14

@JR您可能想看看这个与您类似的示例。

https://github.com/rajeshgheware/spring-integration-samples/tree/master/spring-integration-samples-tcp-concurrent-server

我成功地为3900个并发客户端测试了这段代码

卢志行
2023-03-14

我只是严格按照描述运行了您的测试(复制了您的代码),所有测试都按预期进行。500条消息传递到另一端。

我建议您打开跟踪级别日志记录,并可能向反序列化程序添加一些调试日志记录。

 类似资料:
  • 我试图将我的Spring应用程序作为发现客户端。但是当我添加以下依赖项时 GWT webapp因503服务不可用而无法启动。Netflix Eureka只能与Spring启动或Spring云应用程序一起使用吗?

  • 在我们的Spring应用程序中,我们依赖外部系统。我们希望为该系统的请求设置超时,但无法确定如何配置。 我们使用这个: 我已经尝试过这个:如何使用泽西2. x设置连接和读取超时?和许多其他建议,但无法让它工作。任何建议都将不胜感激。 不工作的更新: 也

  • 客户端集成 CAT推出多种语言的客户端,基本覆盖了主流开发语言。 CAT目前支持::Java、C、Python、node、Go等语言的接入。详情请参考:传送门 注意所有的客户端均在lib目录下,Java客户端也是,Java客户端不再使用cat-client这个模块,这个客户端无任何依赖。 Java C C++ Python Go Node.js 其它语言客户端接入: .NET 客户端 项目地址 c

  • 我试图测试Spring反应式Webclient的默认超时。为此,我创建了一个需要 10 小时才能返回响应的 rest endpoint。 我使用spring-reactive Webclient创建了一个rest客户端。但我看到,springReactiveWebclient一直在等待10个小时。 spring reactive Webclient没有任何默认超时吗?

  • 我有一个如下的集成,我从rest控制器调用这个方法,但回复超时并没有像我预期的那样工作。 我期望的是:如果在我给出的回复超时时间内没有响应,则返回timeout作为对客户端的响应。 对于通道配置中的超时持续时间,是否需要执行一些操作? 谢谢。

  • 我正在使用Apache HTTP客户端联系外部服务。这项服务可能需要几个小时(如果不是更长的话)才能产生响应。我尝试了一些不同的方法,但要么以套接字结束,要么以读取超时结束。我刚刚尝试使用RequestConfig将套接字和连接超时设置为0,根据文档,这应该是无限的,但请求总是在1小时后返回。有什么想法吗?