websocket作为java后端与web端长链接的工具,一般来说java后端是作为server端存在的。像一些简易版的聊天室,都是通过java后端作为server端进行转发的。
但是有时候,java后端也可以作为客户端进行存在的。本文采用 java-websocket 这个工具类,讲述了如何使用 java后端搭建 springboot版本的websocket客户端。
实现如下机制:
1、提供websocket client 客户端
2、同时连接多个websocket 服务端
2、定时向server发送心跳
3、断开重连
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
无论是项目还是作为依赖包存在,springboot中配置文件都是很重要的存在。目前主要对以下几个属性进行定义
wsUrl: websocket服务端地址
heartbeatInterval: 心跳机制,一般websocket client 都需要心跳,避免断线
enableHeartbeat: 开启心跳
enableReconnection: 是否开启断开重连
但是由于需要连接多个server,故采用map的方式进行连接
application.yml
cancan:
websocket:
client:
config:
- wsUrl: ws://localhost:8082
wsName: ws-01
enableHeartbeat: true
heartbeatInterval: 20000
enableReconnection: true
- wsUrl: ws://localhost:8083
wsName: ws-02
enableHeartbeat: true
heartbeatInterval: 20000
enableReconnection: true
WebsocketClientConfiguration
/**
* @program: cancan-java-share
* @description: Websocket客户端配置
* @author: czchen
* @date: 2022-06-29 15:03:48
*/
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {
private List<ServerProperties> config;
public static class ServerProperties {
/**
* websocket server ws://ip:port
*/
private String wsUrl;
/**
* websocket server name,用于区分不同的服务端
*/
private String wsName;
/**
* 是否启用心跳监测 默认开启
*/
private Boolean enableHeartbeat;
/**
* 心跳监测间隔 默认20000毫秒
*/
private Integer heartbeatInterval;
/**
* 是否启用重连接 默认启用
*/
private Boolean enableReconnection;
public String getWsUrl() {
return wsUrl;
}
public void setWsUrl(String wsUrl) {
this.wsUrl = wsUrl;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Boolean getEnableReconnection() {
return enableReconnection;
}
public void setEnableReconnection(Boolean enableReconnection) {
this.enableReconnection = enableReconnection;
}
public String getWsName() {
return wsName;
}
public void setWsName(String wsName) {
this.wsName = wsName;
}
}
public List<ServerProperties> getConfig() {
return config;
}
public void setConfig(List<ServerProperties> config) {
this.config = config;
}
}
WebsocketRunClient.java
/**
* @program: cancan-java-share
* @description: websocket客户端
* @author: czchen
* @date: 2022-06-29 13:40:59
*/
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
/**
* websocket连接名称
*/
private String wsName;
public WebsocketRunClient(URI serverUri,String wsName) {
super(serverUri);
this.wsName = wsName;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("[websocket {}] Websocket客户端连接成功",wsName);
}
@Override
public void onMessage(String msg) {
log.info("[websocket {}] 收到消息:{}",wsName,msg);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket {}] Websocket客户端关闭",wsName);
System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
}
@Override
public void onError(Exception e) {
log.info("[websocket {}] Websocket客户端出现异常, 异常原因为:{}",wsName,e.getMessage());
}
}
将websocket client 交给 spring 进行管理
由于我连接多个 websocket server 故需要采用map,不用的名称进行区别
WebsocketMultipleBeanConfig.java
/**
* @program: cancan-java-share
* @description: Websocket多客户端配置
* @author: czchen
* @date: 2022-06-29 15:58:19
*/
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {
@Bean
public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){
Map<String, WebsocketRunClient> retMap = new HashMap<>(5);
List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();
for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {
String wsUrl = serverProperties.getWsUrl();
String wsName = serverProperties.getWsName();
Boolean enableReconnection = serverProperties.getEnableReconnection();
Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
Integer heartbeatInterval = serverProperties.getHeartbeatInterval();
try {
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
new Thread(()->{
while (true){
try {
Thread.sleep(heartbeatInterval);
if(enableHeartbeat){
websocketRunClient.send("[websocket "+wsName+"] 心跳检测");
log.info("[websocket {}] 心跳检测",wsName);
}
} catch (Exception e) {
log.error("[websocket {}] 发生异常{}",wsName,e.getMessage());
try {
if(enableReconnection){
log.info("[websocket {}] 重新连接",wsName);
websocketRunClient.reconnect();
websocketRunClient.setConnectionLostTimeout(0);
}
}catch (Exception ex){
log.error("[websocket {}] 重连异常,{}",wsName,ex.getMessage());
}
}
}
}).start();
retMap.put(wsName,websocketRunClient);
} catch (URISyntaxException ex) {
log.error("[websocket {}] 连接异常,{}",wsName,ex.getMessage());
}
}
return retMap;
}
}
/**
* @program: cancan-java-share
* @description: Websocket客户端Controller
* @author: czchen
* @date: 2022-06-29 14:24:07
*/
@RestController
public class WebsocketClientController {
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
@RequestMapping("/send")
public String send(){
WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
websocketRunClient.send("我是客户端,ws-01服务端你好啊!");
return "发送成功";
}
}
2022-06-29 22:07:26.474 INFO 21528 --- [ctReadThread-29] c.c.s.websocket.WebsocketRunClient : [websocket ws-01] Websocket客户端连接成功
2022-06-29 22:07:26.830 INFO 21528 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-29 22:07:26.847 INFO 21528 --- [ main] ngbootWebsocketClientMultipleApplication : Started SpringbootWebsocketClientMultipleApplication in 3.084 seconds (JVM running for 5.995)
2022-06-29 22:07:28.402 INFO 21528 --- [ctReadThread-31] c.c.s.websocket.WebsocketRunClient : [websocket ws-02] Websocket客户端出现异常, 异常原因为:Connection refused: connect
2022-06-29 22:07:28.402 INFO 21528 --- [ctReadThread-31] c.c.s.websocket.WebsocketRunClient : [websocket ws-02] Websocket客户端关闭
Connection closed by us Code: -1 Reason: Connection refused: connect
2022-06-29 22:07:46.350 INFO 21528 --- [ Thread-8] c.c.s.c.WebsocketMultipleBeanConfig : [websocket ws-01] 心跳检测
两个服务端,一个连接得上,一个连接不上。连上的会发心跳,连不上的会按照周期重连