当前位置: 首页 > 知识库问答 >
问题:

Spring集成子集和Spring子集交互问题

王炜
2023-03-14

我创建了一个新示例,并将代码分为客户端和服务器端。

完整的代码可以在这里找到。

服务器端有3个版本。

  • 服务器无Spring Boot应用程序,使用Spring Integration RSocket InboundGateway
  • 服务器引导重用Spring RSocket autconfiguration,并通过serverrsocketmessagehandler创建ServerRSocketConnecter
  • 服务器引导消息映射不使用Spring集成,只使用Spring BootRSocket autconfiguration,以及控制器和消息映射

客户端有两个版本。

  • 客户端,使用Spring IntegrRocket OutancedGateway发送消息。
  • 客户端-请求者使用RSocketRequest发送消息,根本不使用Spring集成。

客户端和服务器交互方式REQUEST_CHANNEL,通过TCP/localhost:7000连接服务器。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用程序类:

@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

pom.xml.中的依赖性

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

应用属性

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类。

@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

pom.xml.中的依赖性

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

application.properties.

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

在客户端中,pom.xml中的依赖关系就像。


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用程序类:


@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

运行客户端和服务器应用程序时,尝试访问http://localhost:8080/hello。

当使用服务器和服务器引导(使用InboundGateway处理消息)时,输出如下所示。

curl http://localhost:8080/hello

data:ABCD

使用服务器引导消息映射时,输出如我所料:

data:A
data:B
data:C
data:D
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

应用程序类:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

运行此客户端和3个服务器时,尝试通过curl访问http://localhost:8080/hello。

当使用服务器和服务器引导时,它会抛出一个类强制转换异常。

使用服务器引导消息映射时,输出如我所料:

data:A
data:B
data:C
data:D

我不知道是什么地方的问题的配置的无限网关和输出网关?

共有1个答案

赵渊
2023-03-14

感谢您提供如此详细的样品!

所以,我看到的。两个客户端(普通RSocketRequest和Spring集成)都可以与普通RSocket服务器很好地配合使用。

要使它们与Spring Integration server配合使用,您必须进行以下更改:

  1. 服务器端:

. Request estElementType(ResolvableType.for类(String.class))添加RSockets.inboundGateway()定义中,以便它知道转换传入有效负载的内容。

客户端:

<代码>。数据(通量。just(“a”、“b”、“c”、“d”))。

目前,Spring集成的服务器端并不将传入的流量视为独立的有效负载流。因此,我们尝试将所有这些连接到一个值中。新的行分隔符是一个指标,我们期望独立的值。Spring消息传递的作用正好相反:它检查多值预期类型,并在其映射()中对传入的通量中的每个元素进行解码,而不是对整个发布者进行解码。

这将是一个突破性的变化,但可能需要考虑修复RSocketInboundGateway逻辑,以与RSocket支持的常规消息映射保持一致。请随意提出GH问题!

 类似资料:
  • 问题内容: 我想使用Spring Batch和Spring Integration从数据库导入数据,并将它们写入文件,然后通过ftp将其传输到远程服务器。 但是我想我的问题是我不想为我的表创建域对象。我的查询是随机的,我想要一些可以读取数据并将其写入文件并进行传输的东西。 是否可以在不创建各自的域对象的情况下使用Spring Batch和Integration? 问题答案: 绝对。您可以将JDBC

  • 我试图访问facebook数据通过Spring的社会facebook集成使用的说明http://spring.io/guides/gs/accessing-facebook. 但目前我面临两类问题 > 当我运行教程中提到的示例时,我得到以下错误 没有为依赖项找到类型为[org.springframework.social.facebook.api.facebook]的匹配bean 当我在Faceb

  • 我正在尝试将Spring Security性与LDAP集成。使用spring core版本4.0.5,spring security版本3.2.2和spring ldap版本1.3.2。这是我的安全配置xml

  • 我在springboot项目1.5.10版中工作。释放 我正在为sftp使用Spring集成。以下gradle依赖项对我来说一切都很好 我还将普罗米修斯整合到模块中。 我已经完成了普罗米修斯的所有其他要求。但是我没有得到普罗米修斯的指标。 项目中的所有其他服务都在使用prometheus,但没有使用spring integration sftp,prometheus正在为所有这些服务工作。 我尝试

  • Spring是一个流行的Web框架,它提供易于集成与很多常见的网络任务。所以,问题是,为什么我们需要Spring,当我们有Struts2?Spring是超过一个MVC框架 - 它提供了许多其它好用的东西,这是不是在Struts。例如:依赖注入可以是有用的任何框架。在本章中,我们将通过一个简单的例子来看看如何集成Spring和Struts2一起。 首先,需要添加下列文件到项目的构建路径从Spring

  • 我正在尝试使用几个s-vaadin、jsp等实现一个应用程序。 它使用简单的,但后来我决定使用vaadin作为ui。 我创建了vaadin servlet(Spring的servlet也留下了)。我的看起来像这样 我创建了vaadin组件并根据我的需要对其进行了调整。我使用自动装配进行服务。 也是一个Spring bean。 < code>ProjectRepository是另一个spring b