RSocket RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。RSocket专门设计用于与Reactive风格应用配合使用,这些应用程序基本上是非阻塞的,并且通常(但不总是)与异步行为配对。它是传输无关的,支持 TCP、WebSocket和Aeron UDP协议,并支持无语义损失的混合传输协议——回压和流量控制仍然有效。
它还支持连接恢复。当你建立 RSocket 连接时,你可以指定前一个连接的 ID,如果流仍然在服务器的内存中,则你可以继续消费你的流。
一、RSocket与Servlet的对比
Servlet是一套Java的API规范,基于HTTP协议之上。主要功能提供HTTP服务的class,就是通过HTTP Request,处理后,最终调用HTTP Response完成输出!
RSocket自定义二进制协议,RSocket定位高性能通讯,比HTTP高非常多(号称10倍)
二、RSocket的交互模型
RSocket支持以下交互模型:
request/response (stream of 1)
request/stream (finite stream of many)
fire-and-forget (no response)
channel (bi-directional streams)
三、Demo
1、项目搭建
spring-boot-starter的引入略
创建两个模块client 和server
服务端:
依赖:
dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-starter-rsocket'
}
创建一个User实体
package com.iscas.rsocket.server.samples.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:53
* @since jdk1.8
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
public static List<User> users = new ArrayList<>();
static {
users.add(new User(1, "zhangsan", 12));
users.add(new User(2, "lisi", 22));
users.add(new User(3, "wangwu", 25));
}
private Integer id;
private String name;
private int age;
}
配置文件
spring.rsocket.server.port=9789
启动类
package com.iscas.rsocket.server.samples;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:28
* @since jdk1.8
*/
@SpringBootApplication
public class RsocketServerSamples {
public static void main(String[] args) {
SpringApplication.run(RsocketServerSamples.class, args);
}
}
客户端:
依赖
dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-starter-rsocket'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux'
}
创建一个User实体
package com.iscas.rsocket.client.samples.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:53
* @since jdk1.8
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private Integer id;
private String name;
private int age;
}
配置文件
server.port=9799
配置类
package com.iscas.rsocket.client.samples.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
/**
* 客户端配置类
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:30
* @since jdk1.8
*/
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {
RSocketStrategies strategies = RSocketStrategies.builder()
// .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
// .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
.decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 9789);
return requester;
}
}
启动类
package com.iscas.rsocket.client.samples;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:28
* @since jdk1.8
*/
@SpringBootApplication
public class RsocketClientSamples {
public static void main(String[] args) {
SpringApplication.run(RsocketClientSamples.class, args);
System.out.println(1111);
}
}
2、request/response模式
HTTP也使用这种通信方式,这也是最常见的、最相似的交互模式。 在这种交互模式里, 由客户端初始化通信并发送一个请求。之后,服务器端执行操作并返回一个响应给客户端–这时通信完成。 在我们测试程序里, 一个客户端发送一个请求按照ID查询用户。 作为回复,服务器会返回这个ID对应的用户。
服务端的控制层
package com.iscas.rsocket.server.samples.reqres;
import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
import java.util.Objects;
/**
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 17:50
* @since jdk1.8
*/
@Controller
public class ReqResServerController {
@MessageMapping("findById")
public Mono<User> currentMarketData(User user) {
Integer id = user.getId();
for (User user1 : User.users) {
if (Objects.equals(user1.getId(), id)) {
return Mono.just(user1);
}
}
return Mono.empty();
}
}
客户端控制层
package com.iscas.rsocket.client.samples.reqres;
import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ReqResClientController {
private final RSocketRequester rSocketRequester;
public ReqResClientController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/find/{id}")
public Publisher<User> current(@PathVariable("id")Integer id) {
return rSocketRequester
.route("findById")
.data(new User(id, null, 0))
.retrieveMono(User.class);
}
}
3、Fire/Forget模式
Fire And Forget交互模式。正如名字提示的一样,客户端发送一个请求给服务器,但是不期望服务器的返回响应回来。 在我们的测试程序中, 客户端请求实现用户新增。
服务端控制层
package com.iscas.rsocket.server.samples.fireforget;
import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
/**
* FIRE/FORGET模式
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 18:59
* @since jdk1.8
*/
@Controller
public class FireForgetServerController {
@MessageMapping("add")
public Mono<Void> collectMarketData(User user) {
User.users.add(user);
return Mono.empty();
}
}
客户端控制层
package com.iscas.rsocket.client.samples.fireforget;
import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 19:03
* @since jdk1.8
*/
@RestController
public class FireForgetClientController {
private final RSocketRequester rSocketRequester;
public FireForgetClientController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/add")
public Publisher<Void> collect() {
return rSocketRequester
.route("add")
.data(new User(11, "test-user1", 23))
.send();
}
}
4、Request/Stream模式
请求流是一种更复杂的交互模式, 这个模式中客户端发送一个请求,但是在一段时间内从服务器端获取到多个响应。测试程序实现获取所有用户的功能。
服务端控制层
package com.iscas.rsocket.server.samples.reqstream;
import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
/**
*
* Request/Stream模式
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 19:11
* @since jdk1.8
*/
@Controller
public class ReqStreamServerController {
@MessageMapping("getAll")
public Flux<User> feedMarketData() {
return Flux.fromStream(User.users.stream());
}
}
客户端控制层
package com.iscas.rsocket.client.samples.reqstream;
import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
/**
*
* Request/Stream模式
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/2/6 19:11
* @since jdk1.8
*/
@RestController
public class ReqStreamClientController {
@Autowired
private RSocketRequester rSocketRequester;
@GetMapping(value = "/getAll", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<User> getAll() {
return rSocketRequester
.route("getAll")
// .data()
.retrieveFlux(User.class);
}
}