java-websocket连接多个websocket server 自定义springboot

童花蜂
2023-12-01

前言

websocket作为java后端与web端长链接的工具,一般来说java后端是作为server端存在的。像一些简易版的聊天室,都是通过java后端作为server端进行转发的。

但是有时候,java后端也可以作为客户端进行存在的。本文采用 java-websocket 这个工具类,讲述了如何使用 java后端搭建 springboot版本的websocket客户端。

实现如下机制:

1、提供websocket client 客户端
2、同时连接多个websocket 服务端
2、定时向server发送心跳
3、断开重连

一、开始进行编码

2.1 导入对应的依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

2.2 配置文件

无论是项目还是作为依赖包存在,springboot中配置文件都是很重要的存在。目前主要对以下几个属性进行定义

wsUrl: websocket服务端地址
heartbeatInterval: 心跳机制,一般websocket client 都需要心跳,避免断线
enableHeartbeat: 开启心跳
enableReconnection: 是否开启断开重连

但是由于需要连接多个server,故采用map的方式进行连接

application.yml

cancan:
  websocket:
    client:
      config:
        - wsUrl: ws://localhost:8082
          wsName: ws-01
          enableHeartbeat: true
          heartbeatInterval: 20000
          enableReconnection: true
        - wsUrl: ws://localhost:8083
          wsName: ws-02
          enableHeartbeat: true
          heartbeatInterval: 20000
          enableReconnection: true

WebsocketClientConfiguration

/**
 * @program: cancan-java-share
 * @description: Websocket客户端配置
 * @author: czchen
 * @date: 2022-06-29 15:03:48
 */
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {

    private List<ServerProperties> config;

    public static class ServerProperties {
        /**
         * websocket server ws://ip:port
         */
        private String wsUrl;
        /**
         * websocket server name,用于区分不同的服务端
         */
        private String wsName;
        /**
         * 是否启用心跳监测 默认开启
         */
        private Boolean enableHeartbeat;
        /**
         * 心跳监测间隔 默认20000毫秒
         */
        private Integer heartbeatInterval;
        /**
         * 是否启用重连接 默认启用
         */
        private Boolean enableReconnection;

        public String getWsUrl() {
            return wsUrl;
        }

        public void setWsUrl(String wsUrl) {
            this.wsUrl = wsUrl;
        }

        public Boolean getEnableHeartbeat() {
            return enableHeartbeat;
        }

        public void setEnableHeartbeat(Boolean enableHeartbeat) {
            this.enableHeartbeat = enableHeartbeat;
        }

        public Integer getHeartbeatInterval() {
            return heartbeatInterval;
        }

        public void setHeartbeatInterval(Integer heartbeatInterval) {
            this.heartbeatInterval = heartbeatInterval;
        }

        public Boolean getEnableReconnection() {
            return enableReconnection;
        }

        public void setEnableReconnection(Boolean enableReconnection) {
            this.enableReconnection = enableReconnection;
        }

        public String getWsName() {
            return wsName;
        }

        public void setWsName(String wsName) {
            this.wsName = wsName;
        }
    }

    public List<ServerProperties> getConfig() {
        return config;
    }

    public void setConfig(List<ServerProperties> config) {
        this.config = config;
    }
}

2.3 websocket client 核心类

WebsocketRunClient.java

/**
 * @program: cancan-java-share
 * @description: websocket客户端
 * @author: czchen
 * @date: 2022-06-29 13:40:59
 */
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
    /**
     * websocket连接名称
     */
    private String wsName;

    public WebsocketRunClient(URI serverUri,String wsName) {
        super(serverUri);
        this.wsName = wsName;
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[websocket {}] Websocket客户端连接成功",wsName);
    }

    @Override
    public void onMessage(String msg) {
        log.info("[websocket {}] 收到消息:{}",wsName,msg);
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info("[websocket {}] Websocket客户端关闭",wsName);
        System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
    }

    @Override
    public void onError(Exception e) {
        log.info("[websocket {}] Websocket客户端出现异常, 异常原因为:{}",wsName,e.getMessage());
    }
}

2.4 websocket client bean注入类

将websocket client 交给 spring 进行管理

由于我连接多个 websocket server 故需要采用map,不用的名称进行区别

WebsocketMultipleBeanConfig.java

/**
 * @program: cancan-java-share
 * @description: Websocket多客户端配置
 * @author: czchen
 * @date: 2022-06-29 15:58:19
 */
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {

    @Bean
    public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){

        Map<String, WebsocketRunClient> retMap = new HashMap<>(5);

        List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();

        for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {

            String wsUrl = serverProperties.getWsUrl();
            String wsName = serverProperties.getWsName();
            Boolean enableReconnection = serverProperties.getEnableReconnection();
            Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
            Integer heartbeatInterval = serverProperties.getHeartbeatInterval();

            try {
                WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
                websocketRunClient.connect();
                websocketRunClient.setConnectionLostTimeout(0);

                new Thread(()->{
                    while (true){
                        try {
                            Thread.sleep(heartbeatInterval);
                            if(enableHeartbeat){
                                websocketRunClient.send("[websocket "+wsName+"] 心跳检测");
                                log.info("[websocket {}] 心跳检测",wsName);
                            }
                        } catch (Exception e) {
                            log.error("[websocket {}] 发生异常{}",wsName,e.getMessage());
                            try {
                                if(enableReconnection){
                                    log.info("[websocket {}] 重新连接",wsName);
                                    websocketRunClient.reconnect();
                                    websocketRunClient.setConnectionLostTimeout(0);
                                }
                            }catch (Exception ex){
                                log.error("[websocket {}] 重连异常,{}",wsName,ex.getMessage());
                            }
                        }
                    }
                }).start();

                retMap.put(wsName,websocketRunClient);
            } catch (URISyntaxException ex) {
                log.error("[websocket {}] 连接异常,{}",wsName,ex.getMessage());
            }
        }
        return retMap;

    }

}

2.5 发送信息到服务端

/**
 * @program: cancan-java-share
 * @description: Websocket客户端Controller
 * @author: czchen
 * @date: 2022-06-29 14:24:07
 */
@RestController
public class WebsocketClientController {

    @Autowired
    private Map<String, WebsocketRunClient> websocketRunClientMap;

    @RequestMapping("/send")
    public String send(){

        WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
        websocketRunClient.send("我是客户端,ws-01服务端你好啊!");

        return "发送成功";
    }

}

二、运行

2022-06-29 22:07:26.474  INFO 21528 --- [ctReadThread-29] c.c.s.websocket.WebsocketRunClient       : [websocket ws-01] Websocket客户端连接成功
2022-06-29 22:07:26.830  INFO 21528 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-29 22:07:26.847  INFO 21528 --- [           main] ngbootWebsocketClientMultipleApplication : Started SpringbootWebsocketClientMultipleApplication in 3.084 seconds (JVM running for 5.995)
2022-06-29 22:07:28.402  INFO 21528 --- [ctReadThread-31] c.c.s.websocket.WebsocketRunClient       : [websocket ws-02] Websocket客户端出现异常, 异常原因为:Connection refused: connect
2022-06-29 22:07:28.402  INFO 21528 --- [ctReadThread-31] c.c.s.websocket.WebsocketRunClient       : [websocket ws-02] Websocket客户端关闭
Connection closed by us Code: -1 Reason: Connection refused: connect
2022-06-29 22:07:46.350  INFO 21528 --- [       Thread-8] c.c.s.c.WebsocketMultipleBeanConfig      : [websocket ws-01] 心跳检测

两个服务端,一个连接得上,一个连接不上。连上的会发心跳,连不上的会按照周期重连

源码地址

 类似资料: