我创建了一个新示例,并将代码分为客户端和服务器端。
完整的代码可以在这里找到。
服务器端有3个版本。
客户端有两个版本。
RSocketRequest
发送消息,根本不使用Spring集成。客户端和服务器交互方式REQUEST_CHANNEL,通过TCP/localhost:7000连接服务器。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
应用程序类:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
pom.xml.中的依赖性
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
应用属性
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序类。
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
pom.xml.中的依赖性
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
application.properties.
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序类。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}
在客户端中,pom.xml中的依赖关系就像。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
应用程序类:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
运行客户端和服务器应用程序时,尝试访问http://localhost:8080/hello。
当使用服务器和服务器引导(使用InboundGateway处理消息)时,输出如下所示。
curl http://localhost:8080/hello
data:ABCD
使用服务器引导消息映射时,输出如我所料:
data:A
data:B
data:C
data:D
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
应用程序类:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}
运行此客户端和3个服务器时,尝试通过curl访问http://localhost:8080/hello。
当使用服务器和服务器引导时,它会抛出一个类强制转换异常。
使用服务器引导消息映射时,输出如我所料:
data:A
data:B
data:C
data:D
我不知道是什么地方的问题的配置的无限网关和输出网关?
感谢您提供如此详细的样品!
所以,我看到的。两个客户端(普通RSocketRequest
和Spring集成)都可以与普通RSocket服务器很好地配合使用。
要使它们与Spring Integration server配合使用,您必须进行以下更改:
将. Request estElementType(ResolvableType.for类(String.class))
添加到RSockets.inboundGateway()
定义中,以便它知道转换传入有效负载的内容。
客户端:
<代码>。数据(通量。just(“a”、“b”、“c”、“d”))。
目前,Spring集成的服务器端并不将传入的流量视为独立的有效负载流。因此,我们尝试将所有这些连接到一个值中。新的行分隔符是一个指标,我们期望独立的值。Spring消息传递的作用正好相反:它检查多值预期类型,并在其映射()中对传入的通量中的每个元素进行解码,而不是对整个发布者进行解码。
这将是一个突破性的变化,但可能需要考虑修复RSocketInboundGateway逻辑,以与RSocket支持的常规消息映射保持一致。请随意提出GH问题!
问题内容: 我想使用Spring Batch和Spring Integration从数据库导入数据,并将它们写入文件,然后通过ftp将其传输到远程服务器。 但是我想我的问题是我不想为我的表创建域对象。我的查询是随机的,我想要一些可以读取数据并将其写入文件并进行传输的东西。 是否可以在不创建各自的域对象的情况下使用Spring Batch和Integration? 问题答案: 绝对。您可以将JDBC
我试图访问facebook数据通过Spring的社会facebook集成使用的说明http://spring.io/guides/gs/accessing-facebook. 但目前我面临两类问题 > 当我运行教程中提到的示例时,我得到以下错误 没有为依赖项找到类型为[org.springframework.social.facebook.api.facebook]的匹配bean 当我在Faceb
我正在尝试将Spring Security性与LDAP集成。使用spring core版本4.0.5,spring security版本3.2.2和spring ldap版本1.3.2。这是我的安全配置xml
我在springboot项目1.5.10版中工作。释放 我正在为sftp使用Spring集成。以下gradle依赖项对我来说一切都很好 我还将普罗米修斯整合到模块中。 我已经完成了普罗米修斯的所有其他要求。但是我没有得到普罗米修斯的指标。 项目中的所有其他服务都在使用prometheus,但没有使用spring integration sftp,prometheus正在为所有这些服务工作。 我尝试
Spring是一个流行的Web框架,它提供易于集成与很多常见的网络任务。所以,问题是,为什么我们需要Spring,当我们有Struts2?Spring是超过一个MVC框架 - 它提供了许多其它好用的东西,这是不是在Struts。例如:依赖注入可以是有用的任何框架。在本章中,我们将通过一个简单的例子来看看如何集成Spring和Struts2一起。 首先,需要添加下列文件到项目的构建路径从Spring
我正在尝试使用几个s-vaadin、jsp等实现一个应用程序。 它使用简单的,但后来我决定使用vaadin作为ui。 我创建了vaadin servlet(Spring的servlet也留下了)。我的看起来像这样 我创建了vaadin组件并根据我的需要对其进行了调整。我使用自动装配进行服务。 也是一个Spring bean。 < code>ProjectRepository是另一个spring b