当前位置: 首页 > 知识库问答 >
问题:

Spring集成连接到服务器端口的多个客户端

孙子民
2023-03-14

从我的应用程序中,我需要配置多个需要连接到单个服务器的客户端连接。为此,我使用Application Context BeanFactory创建了可变数量的bean,具体取决于我配置了多少客户端。这是2个客户端的代码:

//setup beans;
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.scan("pkg");
ConnectionFactory factory = new ConnectionFactory();
int clients = 2; //TODO read this value from file
ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();
for (int count = 1; count <= clients; count++) {
    TcpNetClientConnectionFactory connectionFactory = factory.createClientConnectionFactory("127.0.0.1", 6680);

    //connection factory
    beanFactory.registerSingleton("connectionFactory_" + String.valueOf(count), connectionFactory);

    //inbound gateway
    MessageChannel input = new DirectChannel();
    MessageChannel output = new DirectChannel();
    TcpInboundGateway gateway = factory.createInboundGateway(connectionFactory, beanFactory, input, output, 10000, 20000);
    beanFactory.registerSingleton("gateway_" + String.valueOf(count), gateway);

    //message transformation and handling
    IntegrationFlow flow = factory.createFlow(input);
    beanFactory.registerSingleton("flow_" + String.valueOf(count), flow);
}
ctx.refresh();

//open connections
for(int count = 1; count <= clients; count++) {
    TcpInboundGateway gateway = ctx.getBean("gateway_" + count, TcpInboundGateway.class);
    //necessary for the client to connect
    gateway.retryConnection();
}

这是我的工厂方法:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class ConnectionFactory {      
    public TcpNetClientConnectionFactory createClientConnectionFactory(String ip, int port) {
        TcpNetClientConnectionFactory factory = new TcpNetClientConnectionFactory(ip, port);
        factory.setSingleUse(false);
        factory.setSoTimeout(10000);
        factory.setSerializer(new ByteArrayLfSerializer());
        factory.setDeserializer(new ByteArrayLfSerializer());

        return factory;
    }

    public TcpInboundGateway createInboundGateway(
        AbstractConnectionFactory factory,
        BeanFactory beanFactory,
        MessageChannel input,
        int replyTimeout,
        int retryInterval) {
        TcpInboundGateway gateway = new TcpInboundGateway();
        gateway.setRequestChannel(input);
        gateway.setConnectionFactory(factory);
        gateway.setClientMode(true);
        gateway.setReplyTimeout(replyTimeout);
        gateway.setRetryInterval(retryInterval);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();
        gateway.setTaskScheduler(scheduler);
        gateway.setBeanFactory(beanFactory);

        return gateway;
    }

    public IntegrationFlow createFlow(MessageChannel input) {
        IntegrationFlowBuilder builder = IntegrationFlows.from(input);
        builder.transform(Transformers.objectToString()).handle(System.out::println);

        return builder.get();
    }
}

当我运行程序时,两个客户端都连接到我的服务器。然而,一旦服务器将其第一个有效负载发送到每个客户端,我就会得到以下异常(每个客户端一个):

Exception sending message: GenericMessage [payload=byte[5], headers={ip_tcp_remotePort=6680, ip_connectionId=localhost:6680:33372:e26b9973-a32e-4c28-b808-1f2556576d01, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4443ca34-fb53-a753-7603-53f6d7d82e11, ip_hostname=localhost, timestamp=1464098102462}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    ... 14 common frames omitted

这个想法是数据将被读取,通过我为In边界网关配置的通道发送到变压器,然后变压器将数据转换为字符串,然后将其打印出来。

为什么框架不知道把数据放在哪个通道?据我所知,我确实在入站网关工厂方法中为每个客户端创建了一个唯一的通道。有人能看看我的配置,让我知道我错过了什么,因为我绝对被这个难倒了。

共有2个答案

马淇
2023-03-14

以下是简化的工作解决方案:

豆子。JAVA

package beanconfig;

import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
public class Beans {
    //Beans can be configured here
}

集成测试。JAVA

import org.junit.Test;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer;
import org.springframework.integration.transformer.ObjectToStringTransformer;
import org.springframework.messaging.MessageChannel;

public class IntegrationTest {
    private String generateComponentName(String baseName, int instanceCount) {
        return baseName + "_" + instanceCount;
    }

    @Test
    public void integrationTest1() throws Exception {
        try(AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext()) {
            ctx.scan("beanconfig");
            ctx.refresh();

            ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();

            int numberOfClients = 2; //TODO configure from file

            for (int count = 0; count < numberOfClients; count++) {
                //connection factory
                TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6680);
                connectionFactory.setSingleUse(false);
                connectionFactory.setSoTimeout(10000);
                connectionFactory.setSerializer(new ByteArrayLfSerializer());
                connectionFactory.setDeserializer(new ByteArrayLfSerializer());

                //inbound gateway
                TcpInboundGateway inboundGateway = new TcpInboundGateway();
                inboundGateway.setRequestChannel(new DirectChannel());
                inboundGateway.setConnectionFactory(connectionFactory);
                inboundGateway.setClientMode(true);
                inboundGateway.setReplyTimeout(10000);
                inboundGateway.setRetryInterval(20000);

                //message transformation and flow
                String flowName = generateComponentName("flow", count);
                IntegrationFlow flow = IntegrationFlows.from(inboundGateway)
                    .transform(new ObjectToStringTransformer())
                    .handle(h -> System.out.println("Message received: " + h.getPayload()))
                    .get();
                beanFactory.registerSingleton(flowName, flow);
                beanFactory.initializeBean(flow, flowName);
            }

            ctx.start();

            //TODO do proper validation here
            Thread.sleep(10000);
        }
    }
}

基本上,我最初的尝试有几个地方出了问题。以下是我为使其工作而做的更改:

1) 创建AnnotationConfigApplicationContext时,必须将配置类作为参数创建,该参数标记为@EnableIntegration注释。如果没有,则必须通过包含此注释的上下文扫描组件。我在第一次尝试时确实这样做了,但调用refresh太晚了,应该直接在ctx之后调用它。扫描因为我的ctx。refresh()是在我的beanfactory注册之后创建集成bean时,实际上没有设置@EnableIntegration。移动ctx。刷新()位于ctx正下方。scan()解决了这个问题。

2)注册到上下文中的每个bean也必须由beanFactory初始化。这是为了确保BeanPostProcers运行(这不是通过ynsterSingleton自动完成的)。

3)然后需要调用ctx.start()来启用在ctx.refresh()之后创建的bean。

鲍国兴
2023-03-14

没有人会使用来自网关的消息。setReplyChannel(输出)

至少我们没有看到这样的事情:

之后将打印出来。

在大多数情况下,如果您的某些SubscribableChannel没有任何订阅者,我们有Dispatcher没有订阅者:未配置或停止。

编辑

忘记我之前的表情。它适用于出站情况。

您的TcpInboundGateway很好。尽管您不需要设置ReplyChannel(),因为您始终可以依赖默认内置的临时ReplyChannel来等待来自下游流的一些结果。

你的IntegrationFlow看起来也不错。这是正确的<代码>。transform()不会向任何其他频道发送任何内容。它只依赖于标头中的临时ReplyChannel。

我认为您的问题是您没有为您的任何@Configuration类指定@EnableIntgraiton:http://docs.spring.io/spring-integration/reference/html/overview.html#_configuration

编辑2

参见GH问题。

因此,除了代码之外,您还需要:

>

* <p>The given instance is supposed to be fully initialized; the registry
* will not perform any initialization callbacks (in particular, it won't
* call InitializingBean's {@code afterPropertiesSet} method).

在ctx之后就可以这样做了。refresh()允许注册所有必要的BeanPostProcessor,包括一个用于Spring Integration Java DSL解析的BeanPostProcessor。

调用ctx。start()启动所有生命周期。因为常规ctx无法看到这些手动添加的新内容。refresh()处理。

 类似资料:
  • 我需要实现一个TCP服务器,它基本上应该在与客户端握手时打开一个套接字。 套接字打开后服务器需要保持套接字打开,并且能够通过打开的套接字将消息从服务器推送到客户端 我查看了一些spring集成示例,但不确定我所看到的示例是否确实参考了我的需求。 1. Spring集成tcp是否有这种能力来保持打开套接字并将消息从服务器发送到客户端? 服务器还应支持传入请求 客户端实现是作为简单Tcp java客户

  • 我一直在使用spring integration,我想连接多个ftp服务器来从远程位置检索文件,谁能给我一个好的例子,如何使用spring integration连接多个ftp服务器 先谢谢你,Udeshika

  • 我能够让eureka服务器以点对点模式运行。但我很好奇的一件事是如何让服务发现客户端注册到多个eureka服务器。 我的用例如下:<br>假设我有一个服务注册到其中一个eureka服务器(例如服务器a),并且该注册被复制到它的对等服务器。服务实际上指向服务器A。如果服务器A停机,客户机希望与服务器A续费,如果服务器A不再存在,续费如何工作。我是否需要同时向这两个服务器注册?如果不是,那么如果客户端

  • 我有一个示例Spring启动应用程序来运行图形QL服务器,具有作为客户端,我的pom有以下依赖项: 当我尝试从客户端连接时,出现以下错误: 狩猎决议好心建议。 我还有几个问题: 我应该使用SimpleGraphQLHttpServlet将请求路由到endpoint吗 我正在React UI上使用apollo client,那么它是强制使用apollo server还是spring boot可以工作

  • 下面是我的服务器代码: 下面是我的客户端活动代码: 以下是客户端活动的xml布局文件: 因此,我开始认为这不是连接端口的问题,而是应用程序的android客户端的问题。但我想不出有什么问题。 顺便说一下,当我试图发送消息时,运行客户端的手机和运行服务器的笔记本电脑都连接到了同一个网络。

  • 编辑2:我切换到,并用包装客户端工厂,这样设备就可以很好地到达后端。但是当后端发回一些东西时,我会得到错误的出站套接字,客户机套接字死亡。我认为这是因为后端没有正确路由消息所必需的头。如何捕获此信息?我的配置类如下: