当前位置: 首页 > 工具软件 > RSocket > 使用案例 >

springboot学习(四十) springboot下rsocket的使用

姜增
2023-12-01

RSocket RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。RSocket专门设计用于与Reactive风格应用配合使用,这些应用程序基本上是非阻塞的,并且通常(但不总是)与异步行为配对。它是传输无关的,支持 TCP、WebSocket和Aeron UDP协议,并支持无语义损失的混合传输协议——回压和流量控制仍然有效。
它还支持连接恢复。当你建立 RSocket 连接时,你可以指定前一个连接的 ID,如果流仍然在服务器的内存中,则你可以继续消费你的流。

一、RSocket与Servlet的对比
Servlet是一套Java的API规范,基于HTTP协议之上。主要功能提供HTTP服务的class,就是通过HTTP Request,处理后,最终调用HTTP Response完成输出!
RSocket自定义二进制协议,RSocket定位高性能通讯,比HTTP高非常多(号称10倍)

二、RSocket的交互模型
RSocket支持以下交互模型:

request/response (stream of 1)
request/stream (finite stream of many)
fire-and-forget (no response)
channel (bi-directional streams)

三、Demo
1、项目搭建
spring-boot-starter的引入略
创建两个模块client 和server
服务端:
依赖:


dependencies {
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-rsocket'
}

创建一个User实体

package com.iscas.rsocket.server.samples.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:53
 * @since jdk1.8
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    public static List<User> users = new ArrayList<>();
    static {
        users.add(new User(1, "zhangsan", 12));
        users.add(new User(2, "lisi", 22));
        users.add(new User(3, "wangwu", 25));
    }
    private Integer id;
    private String name;
    private int age;
}

配置文件

spring.rsocket.server.port=9789

启动类

package com.iscas.rsocket.server.samples;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 *
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:28
 * @since jdk1.8
 */
@SpringBootApplication
public class RsocketServerSamples {
    public static void main(String[] args) {
        SpringApplication.run(RsocketServerSamples.class, args);
    }
}

客户端:

依赖


dependencies {
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-rsocket'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux'
}

创建一个User实体

package com.iscas.rsocket.client.samples.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:53
 * @since jdk1.8
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private Integer id;
    private String name;
    private int age;
}

配置文件

server.port=9799

配置类

package com.iscas.rsocket.client.samples.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;

/**
 * 客户端配置类
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:30
 * @since jdk1.8
 */
@Configuration
public class ClientConfiguration {
    
    @Bean
    RSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {
        RSocketStrategies strategies = RSocketStrategies.builder()
//                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
//                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester requester = RSocketRequester.builder()
                .rsocketStrategies(strategies)
                .tcp("localhost", 9789);

        return requester;
    }
}

启动类

package com.iscas.rsocket.client.samples;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 *
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:28
 * @since jdk1.8
 */
@SpringBootApplication
public class RsocketClientSamples {
    public static void main(String[] args) {
        SpringApplication.run(RsocketClientSamples.class, args);
        System.out.println(1111);
    }
}

2、request/response模式

HTTP也使用这种通信方式,这也是最常见的、最相似的交互模式。 在这种交互模式里, 由客户端初始化通信并发送一个请求。之后,服务器端执行操作并返回一个响应给客户端–这时通信完成。 在我们测试程序里, 一个客户端发送一个请求按照ID查询用户。 作为回复,服务器会返回这个ID对应的用户。

服务端的控制层

package com.iscas.rsocket.server.samples.reqres;

import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;

import java.util.Objects;

/**
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 17:50
 * @since jdk1.8
 */

@Controller
public class ReqResServerController {
    @MessageMapping("findById")
    public Mono<User> currentMarketData(User user) {
        Integer id = user.getId();
        for (User user1 : User.users) {
            if (Objects.equals(user1.getId(), id)) {
                return Mono.just(user1);
            }
        }
        return Mono.empty();
    }
}

客户端控制层

package com.iscas.rsocket.client.samples.reqres;

import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ReqResClientController {

    private final RSocketRequester rSocketRequester;

    public ReqResClientController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/find/{id}")
    public Publisher<User> current(@PathVariable("id")Integer id) {
        return rSocketRequester
          .route("findById")
          .data(new User(id, null, 0))
          .retrieveMono(User.class);
    }
}

3、Fire/Forget模式

Fire And Forget交互模式。正如名字提示的一样,客户端发送一个请求给服务器,但是不期望服务器的返回响应回来。 在我们的测试程序中, 客户端请求实现用户新增。

服务端控制层

package com.iscas.rsocket.server.samples.fireforget;

import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;

/**
 * FIRE/FORGET模式
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 18:59
 * @since jdk1.8
 */
@Controller
public class FireForgetServerController {

    @MessageMapping("add")
    public Mono<Void> collectMarketData(User user) {
        User.users.add(user);
        return Mono.empty();
    }
}

客户端控制层

package com.iscas.rsocket.client.samples.fireforget;

import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 19:03
 * @since jdk1.8
 */
@RestController
public class FireForgetClientController {
    private final RSocketRequester rSocketRequester;

    public FireForgetClientController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/add")
    public Publisher<Void> collect() {
        return rSocketRequester
                .route("add")
                .data(new User(11, "test-user1", 23))
                .send();
    }
}

4、Request/Stream模式

请求流是一种更复杂的交互模式, 这个模式中客户端发送一个请求,但是在一段时间内从服务器端获取到多个响应。测试程序实现获取所有用户的功能。

服务端控制层

package com.iscas.rsocket.server.samples.reqstream;

import com.iscas.rsocket.server.samples.model.User;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;

/**
 *
 * Request/Stream模式
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 19:11
 * @since jdk1.8
 */
@Controller
public class ReqStreamServerController {
    @MessageMapping("getAll")
    public Flux<User> feedMarketData() {
        return Flux.fromStream(User.users.stream());
    }
}

客户端控制层

package com.iscas.rsocket.client.samples.reqstream;

import com.iscas.rsocket.client.samples.model.User;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

/**
 *
 * Request/Stream模式
 *
 * @author zhuquanwen
 * @vesion 1.0
 * @date 2021/2/6 19:11
 * @since jdk1.8
 */
@RestController
public class ReqStreamClientController {
    @Autowired
    private RSocketRequester rSocketRequester;
    @GetMapping(value = "/getAll", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Publisher<User> getAll() {
        return rSocketRequester
                .route("getAll")
//                .data()
                .retrieveFlux(User.class);
    }
}

 类似资料: