本文使用netty-socketio实现类似websocket的消息推送,并通过命名空间实现用户隔离。
WebSocket是一种通信协议,它通过TCP连接在客户端和服务器之间提供双向通信,WebSocket始终保持打开状态,因此它们允许实时数据传输。当客户端向服务器触发请求时,它不会在接收到响应时关闭连接,而是会继续存在,并等待客户端或服务器终止请求。
Socket.IO 是一个库,可用于在客户端和Web服务器之间进行实时和全双工通信。它使用WebSocket协议提供接口。
1、引入依赖
如果是gradle
compile 'com.corundumstudio.socketio:netty-socketio:1.7.18'
如果是maven
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.18</version>
</dependency>
2、定义配置类
package com.iscas.base.biz.config.socketio;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Optional;
/**
* socket.io
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/3/25 8:51
* @since jdk1.8
*/
//@Configuration
public class SocketioConfig {
@Value("${socket.io.port:8974}")
private int socketIoPort;
@Value("${socket.io.namespaces}")
private String[] namespaces;
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration config =
new com.corundumstudio.socketio.Configuration();
config.setOrigin(null); // 注意如果开放跨域设置,需要设置为null而不是"*"
config.setPort(socketIoPort);
config.setSocketConfig(new SocketConfig());
config.setWorkerThreads(100);
config.setAuthorizationListener(handshakeData -> true);
//允许最大帧长度
config.setMaxFramePayloadLength(1024 * 1024);
//允许下最大内容
config.setMaxHttpContentLength(1024 * 1024);
final SocketIOServer server = new SocketIOServer(config);
Optional.ofNullable(namespaces).ifPresent(nss ->
Arrays.stream(nss).forEach(server::addNamespace));
// server.start();
return server;
}
/**
* 注入OnConnect,OnDisconnect,OnEvent注解。 不写的话Spring无法扫描OnConnect,OnDisconnect等注解
* */
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketIOServer){
return new SpringAnnotationScanner(socketIOServer);
}
}
3、配置文件内配置端口和命名空间
#########socket.io相关配置##################
socket.io.port=8974
#命名空间,多个以逗号分隔,每个空间需要对应一个Bean的名字,XXXMessageEventHandler,如chatMessageEventHandler
socket.io.namespaces=/chat,/test
4、定义事件处理接口和默认实现
连接的处理使用前端传入Authorization,可以传入token,做校验。
package com.iscas.biz.socketio;
import com.corundumstudio.socketio.SocketIOClient;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
/**
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/3/25 10:15
* @since jdk1.8
*/
public interface IEventHandler {
void onConnect(SocketIOClient client);
void onDisConnect(SocketIOClient client);
default void connect(SocketIOClient client) {
// if (!client.getNamespace().getName().equals("/chat")) {
client.disconnect();
// return;
// }
String token = client.getHandshakeData().getSingleUrlParam("Authorization");
if (token == null) {
System.err.println("客户端" + client.getSessionId() + "建立websocket连接失败,Authorization不能为null");
client.disconnect();
return;
}
Map header = new HashMap<>();
header.put("Authorization", token);
String username = null;
// try {
// Map<String, Claim> claimMap = JWTUtils.verifyToken(token);
// username = claimMap.get("username").asString();
// if (username == null) {
// throw new RuntimeException("websocket认证失败");
// }
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// throw new RuntimeException("websocket认证失败", e);
// } catch (ValidTokenException e) {
// e.printStackTrace();
// throw new RuntimeException("websocket认证失败", e);
// }
username = token;
if (username != null) {
System.out.println("客户端" + client.getSessionId() + "建立websocket连接成功");
//将用户名和clientId对应 方便推送时候使用
SocketIOStaticInfo.userClientIdMap.put(username, client.getSessionId());
} else {
System.err.println("客户端" + client.getSessionId() + "建立websocket连接失败");
client.disconnect();
}
}
default void disconnect(SocketIOClient client) {
System.out.println("客户端" + client.getSessionId() + "断开websocket连接成功");
//移除
for (Map.Entry<String, UUID> entry : SocketIOStaticInfo.userClientIdMap.entrySet()) {
if (Objects.equals(entry.getValue(), client.getSessionId())) {
SocketIOStaticInfo.userClientIdMap.remove(entry.getKey());
}
}
}
}
5、定义启动类
这里为每个命名空间注册一个事件处理Handler, 规则为bean的name为命名空间名称+MessageHandler。
package com.iscas.base.biz.config;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.iscas.base.biz.service.common.SpringService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Optional;
/**
* @author zhuquanwen
* @vesion 1.0
* @date 2021/3/25 10:07
* @since jdk1.8
*/
@Component
public class ServerRunner implements CommandLineRunner {
@Autowired(required = false)
private SocketIOServer socketIOServer;
@Value("${socket.io.namespaces}")
private String[] namespaces;
@Override
public void run(String... args) throws Exception {
if (socketIOServer != null) {
/*Optional.ofNullable(SpringService.getBean("messageEventHandler"))
.ifPresent(handler -> socketIOServer.getNamespace("/").addListeners(handler));*/
Optional.ofNullable(namespaces).ifPresent(nss ->
Arrays.stream(nss).forEach(ns -> {
//获取命名空间
SocketIONamespace socketIONamespace = socketIOServer.getNamespace(ns);
//获取期待的类名
String className = ns.substring(1) + "MessageEventHandler";
try {
Object bean = SpringService.getBean(className);
Optional.ofNullable(bean).ifPresent(socketIONamespace::addListeners);
} catch (Exception e) {
}
}));
// socketIOServer.getNamespace("/chat").addListeners(messageEventHandler);
socketIOServer.start();
}
}
}
6、几种消息接收和发送模式
(1)点对点消息
@OnEvent(value = "aaaa")
public void onEvent(SocketIOClient client, AckRequest request, String data) {
log.debug("发来消息:" + data);
UUID sessionId = client.getSessionId();
socketIOServer.getNamespace(namespace).getClient(sessionId).sendEvent("bbbb", "点对点消息的返回" + Math.random());
}
(2)房间消息
/**
* 测试加入房间
* */
@OnEvent(value = "joinRoom")
public void onTestJoinRoomEvent(SocketIOClient client, AckRequest request, String data) {
client.leaveRoom(data);
client.joinRoom(data);
}
/**
* 测试房间发送信息(类似于订阅式广播消息)
* */
@OnEvent(value = "testRoom")
public void onTestRoomEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getRoomOperations("room1").sendEvent("testRoom", "房间里的消息" + Math.random());
}
(3) 广播消息
/**
* 测试发送广播消息
* */
@OnEvent(value = "testBroadcast")
public void onTestBroadcastEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getBroadcastOperations().sendEvent("testBroadcast", "广播的消息" + Math.random());
}
7、完整消息处理器
chat 和test
package com.iscas.biz.socketio;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.iscas.common.tools.core.random.RandomStringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author zhuquanwen
* @vesion 1.0
* @date 2018/7/22 18:32
* @since jdk1.8
*/
@Component(value= "chatMessageEventHandler")
@ConditionalOnClass(SocketIOServer.class)
@Slf4j
public class ChatMessageEventHandler implements IEventHandler {
private final SocketIOServer socketIOServer;
private static int testPushCount = 0;
private String namespace = "/chat";
public ChatMessageEventHandler(SocketIOServer socketIOServer) {
this.socketIOServer = socketIOServer;
}
@Override
@OnConnect
public void onConnect(SocketIOClient client) {
connect(client);
}
@Override
@OnDisconnect
public void onDisConnect(SocketIOClient client) {
disconnect(client);
}
@OnEvent(value = "aaaa")
public void onEvent(SocketIOClient client, AckRequest request, String data) {
log.debug("发来消息:" + data);
UUID sessionId = client.getSessionId();
socketIOServer.getNamespace(namespace).getClient(sessionId).sendEvent("bbbb", "点对点消息的返回" + Math.random());
}
/**
* 测试无限推送
* */
@OnEvent(value = "testPush")
public void onTestPushEvent(SocketIOClient client, AckRequest request, String data) {
UUID sessionId = client.getSessionId();
Runnable runnable = () -> {
testPushCount++;
int thisTestPushCount = testPushCount;
for (; ; ) {
if (thisTestPushCount < testPushCount) {
break;
}
socketIOServer.getNamespace(namespace).getClient(sessionId).sendEvent("testPush", RandomStringUtils.randomStr(1024 * 200));
try {
TimeUnit.MILLISECONDS.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(runnable).start();
}
/**
* 测试加入房间
* */
@OnEvent(value = "joinRoom")
public void onTestJoinRoomEvent(SocketIOClient client, AckRequest request, String data) {
client.leaveRoom(data);
client.joinRoom(data);
}
/**
* 测试房间发送信息(类似于订阅式广播消息)
* */
@OnEvent(value = "testRoom")
public void onTestRoomEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getRoomOperations("room1").sendEvent("testRoom", "房间里的消息" + Math.random());
}
/**
* 测试发送广播消息
* */
@OnEvent(value = "testBroadcast")
public void onTestBroadcastEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getBroadcastOperations().sendEvent("testBroadcast", "广播的消息" + Math.random());
}
}
package com.iscas.biz.socketio;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.iscas.common.tools.core.random.RandomStringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author zhuquanwen
* @vesion 1.0
* @date 2018/7/22 18:32
* @since jdk1.8
*/
@Component(value= "testMessageEventHandler")
@ConditionalOnClass(SocketIOServer.class)
@Slf4j
public class TestMessageEventHandler implements IEventHandler {
private final SocketIOServer socketIOServer;
private static int testPushCount = 0;
private String namespace = "/test";
public TestMessageEventHandler(SocketIOServer socketIOServer) {
this.socketIOServer = socketIOServer;
}
@Override
@OnConnect
public void onConnect(SocketIOClient client) {
connect(client);
}
@Override
@OnDisconnect
public void onDisConnect(SocketIOClient client) {
disconnect(client);
}
@OnEvent(value = "aaaa")
public void onEvent(SocketIOClient client, AckRequest request, String data) {
log.debug("发来消息:" + data);
UUID sessionId = client.getSessionId();
socketIOServer.getNamespace(namespace).getClient(sessionId).sendEvent("bbbb", "点对点消息的返回" + Math.random());
}
/**
* 测试无限推送
* */
@OnEvent(value = "testPush")
public void onTestPushEvent(SocketIOClient client, AckRequest request, String data) {
UUID sessionId = client.getSessionId();
Runnable runnable = () -> {
testPushCount++;
int thisTestPushCount = testPushCount;
for (; ; ) {
if (thisTestPushCount < testPushCount) {
break;
}
socketIOServer.getNamespace(namespace).getClient(sessionId).sendEvent("testPush", RandomStringUtils.randomStr(1024 * 200));
try {
TimeUnit.MILLISECONDS.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(runnable).start();
}
/**
* 测试加入房间
* */
@OnEvent(value = "joinRoom")
public void onTestJoinRoomEvent(SocketIOClient client, AckRequest request, String data) {
client.leaveRoom(data);
client.joinRoom(data);
}
/**
* 测试房间发送信息(类似于订阅式广播消息)
* */
@OnEvent(value = "testRoom")
public void onTestRoomEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getRoomOperations("room1").sendEvent("testRoom", "房间里的消息" + Math.random());
}
/**
* 测试发送广播消息
* */
@OnEvent(value = "testBroadcast")
public void onTestBroadcastEvent(SocketIOClient client, AckRequest request, String data) {
socketIOServer.getNamespace(namespace).getBroadcastOperations().sendEvent("testBroadcast", "广播的消息" + Math.random());
}
}
8、前端代码
chat:
<html>
<head>
<meta charset="utf-8"/>
<!--<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.0/socket.io.js"></script>-->
<script src="socket.io-2.1.0.js"></script>
<script>
var socket = null;
var i = 0;
window.onload = function(){
initSocket();
}
function getCookie(c_name)
{
if (document.cookie.length>0)
{
c_start=document.cookie.indexOf(c_name + "=")
if (c_start!=-1)
{
c_start=c_start + c_name.length+1
c_end=document.cookie.indexOf(";",c_start)
if (c_end==-1) c_end=document.cookie.length
return unescape(document.cookie.substring(c_start,c_end))
}
}
return ""
}
function initSocket(){
socket = io('http://localhost:8974/chat?Authorization=' + new Date()); //正式发布环境
// socket = io('http://localhost:8974/test'); //正式发布环境
socket.on('connect', function () {
console.log('socket连接成功');
});
socket.on('disconnect', function () {
console.log('socket断开连接');
});
//==============以下使用命名空间chat========================
//监听广播消息
socket.on('testNamespace', function (data) {
console.log("接收到消息:" + data);
});
//监听点对点消息
socket.on('bbbb', function (data) {
//....收到消息后具体操作
// console.log(data);
console.log(data);
});
//监听后端无限推送的点对点消息
socket.on('testPush', function (data) {
console.log("接收到消息的次数:" + ++i);
});
//监听加入房间的反馈
socket.on('testJoinRoom', function (data) {
console.log("接收到消息:" + data);
});
//监听房间消息
socket.on('testRoom', function (data) {
console.log("接收到消息:" + data);
});
//监听广播消息
socket.on('testBroadcast', function (data) {
console.log("接收到消息:" + data);
});
}
//发送点对点消息
function send(){
socket.emit('aaaa', "aaaaaa");
}
//触发无限推送
function send2(){
socket.emit('testPush', "begin");
}
//发送加入房间消息
function send3(){
socket.emit('joinRoom', "room1");
}
//发送房间消息
function send4(){
socket.emit('testRoom', "testRoomData");
}
//发送广播消息
function send5(){
socket.emit('testBroadcast', "testBroadCastData");
}
</script>
<input type="button" value="发送点对点消息" onclick="send();">
<br/><br/>
<input type="button" value="开启无限推送测试" onclick="send2();">
<br/><br/>
<input type="button" value="测试加入房间" onclick="send3();">
<br/><br/>
<input type="button" value="测试房间内发消息" onclick="send4();">
<br/><br/>
<input type="button" value="测试发送广播消息" onclick="send5();">
<br/><br/>
</head>
</html>
test:
<html>
<head>
<meta charset="utf-8"/>
<!--<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.0/socket.io.js"></script>-->
<script src="socket.io-2.1.0.js"></script>
<script>
var socket = null;
var i = 0;
window.onload = function(){
initSocket();
}
function getCookie(c_name)
{
if (document.cookie.length>0)
{
c_start=document.cookie.indexOf(c_name + "=")
if (c_start!=-1)
{
c_start=c_start + c_name.length+1
c_end=document.cookie.indexOf(";",c_start)
if (c_end==-1) c_end=document.cookie.length
return unescape(document.cookie.substring(c_start,c_end))
}
}
return ""
}
function initSocket(){
socket = io('http://localhost:8974/test?Authorization=' + new Date()); //正式发布环境
// socket = io('http://localhost:8974/test'); //正式发布环境
socket.on('connect', function () {
console.log('socket连接成功');
});
socket.on('disconnect', function () {
console.log('socket断开连接');
});
//==============以下使用命名空间test========================
//监听广播消息
socket.on('testNamespace', function (data) {
console.log("接收到消息:" + data);
});
//监听点对点消息
socket.on('bbbb', function (data) {
//....收到消息后具体操作
// console.log(data);
console.log(data);
});
//监听后端无限推送的点对点消息
socket.on('testPush', function (data) {
console.log("接收到消息的次数:" + ++i);
});
//监听加入房间的反馈
socket.on('testJoinRoom', function (data) {
console.log("接收到消息:" + data);
});
//监听房间消息
socket.on('testRoom', function (data) {
console.log("接收到消息:" + data);
});
//监听广播消息
socket.on('testBroadcast', function (data) {
console.log("接收到消息:" + data);
});
}
//发送点对点消息
function send(){
socket.emit('aaaa', "aaaaaa");
}
//触发无限推送
function send2(){
socket.emit('testPush', "begin");
}
//发送加入房间消息
function send3(){
socket.emit('joinRoom', "room1");
}
//发送房间消息
function send4(){
socket.emit('testRoom', "testRoomData");
}
//发送广播消息
function send5(){
socket.emit('testBroadcast', "testBroadCastData");
}
</script>
<input type="button" value="发送点对点消息" onclick="send();">
<br/><br/>
<input type="button" value="开启无限推送测试" onclick="send2();">
<br/><br/>
<input type="button" value="测试加入房间" onclick="send3();">
<br/><br/>
<input type="button" value="测试房间内发消息" onclick="send4();">
<br/><br/>
<input type="button" value="测试发送广播消息" onclick="send5();">
<br/><br/>
</head>
</html>
9、SocketIOStaticInfo
package com.iscas.biz.socketio;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* 一些静态变量
*
* @author zhuquanwen
* @vesion 1.0
* @date 2018/7/22 21:26
* @since jdk1.8
*/
public class SocketIOStaticInfo {
private SocketIOStaticInfo(){}
/**用户名和websocket clientId 对应关系*/
public static Map<String, UUID> userClientIdMap = new ConcurrentHashMap<>();
}
10、定义一个启用开关
package com.iscas.base.biz.aop.enable;
import com.iscas.base.biz.config.socketio.SocketioConfig;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* <p>socketIo服务器开关</>
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/3/26 10:14
* @since jdk1.8
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(SocketioConfig.class)
public @interface EnableSocketio {
}
在启动类上加上@EnableSocketio注解