1.websocket使用
1.1依赖
<websocket.version>1.1</websocket.version>
<!-- websocket,需要springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>${websocket.version}</version>
<scope>provided</scope>
</dependency>
1.2 component 服务
package com.mods.browser.component;
import com.alibaba.fastjson.JSON;
import com.mods.mbg.mapper.AdminMapper;
import com.mods.mbg.mapper.MessageMapper;
import com.mods.mbg.model.Admin;
import com.mods.mbg.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
@Slf4j
@ServerEndpoint("/admin/websocket/{param}") //前端对接地址,param传用户id
@Component
public class AdminWebSocketServer {
//注意,不要直接用mapper,用不了。套一层service才行。这里是为了简单看
public static MessageMapper messageMapper;
public static AdminMapper adminMapper;
// 记录空闲Session集合
private static CopyOnWriteArraySet<Session> idle = new CopyOnWriteArraySet<Session>();
// 记录正在使用中Session集合,value为Future,表示使用情况
private static ConcurrentHashMap<Session, Future<Void>> busy = new ConcurrentHashMap<Session, Future<Void>>();
//存放所有在线的客户端session
private static Map<String, Session> clientSession = new ConcurrentHashMap<>();
//存放所有在线的客户端信息
private static Map<String, Admin> clientUser = new ConcurrentHashMap<>();
/**
* 客户端打开
*/
@OnOpen
public void onOpen(Session session, @PathParam("param") String id) {
Integer adminId = Integer.valueOf(id);
String sessionId = session.getId();
log.info("有新的客户端进来sessionId={}", sessionId);
Admin admin = adminMapper.selectById(adminId);
if (StringUtils.isNotBlank(sessionId) && admin != null) {
//更新用户的sessionId
Admin bean = new Admin();
bean.setId(adminId);
bean.setSessionId(sessionId);
adminMapper.updateById(bean);
/**
* 注意这里不要直接用mapper的方法,可以套一层,用service,service里放mapper、直接使用服务会无法启动,具体原因我不懂
*/
//将用户存入在线组和存入用户信息
clientSession.put(sessionId, session);
clientUser.put(sessionId, admin);
//判断是否在线
for (Map.Entry<String, Admin> adminEntry : clientUser.entrySet()) {
Admin onlineAdmin = adminEntry.getValue();
if (onlineAdmin.getId() == adminId) {
Message message = new Message();
message.setCreateTime(new Date());
message.setContent("已在其他设备登陆,强制下线!");
message.setType("4");//3-无需存储消息
message.setLaunchId(adminId);
message.setReceiveId(null);
String receiveSessionId = onlineAdmin.getSessionId();
sendTo(message, receiveSessionId);
break;
}
}
//如果不在线就新增
open(session);
log.info("sessionId={}的用户上线!", sessionId);
}
}
/**
* 客户端关闭
*/
@OnClose
public void onClose(Session session) {
String sessionId = session.getId();
log.info("sessionId={}的用户离线", sessionId);
//将掉线的用户移除
clientSession.remove(sessionId);
clientUser.remove(sessionId);
close(session);
}
/**
* 调用此方法以发送消息
*/
@OnMessage
public void onMessage(String content) {
log.info("收到消息:" + content);
Message bean = JSON.parseObject(content, Message.class);
bean.setCreateTime(new Date());
String type = bean.getType();
String receiveIds = bean.getReceiveIds();
if (StringUtils.isBlank(type)) {
log.info("消息类型未指定,发送执行停止!");
return;
}
if ("1".equals(type)) {//1-系统消息(接受人:所有人,无需指定用户)
bean.setTitle("系统消息");
bean.setLaunchId(0);
bean.setSeeStatus("1");//默认已读
bean.setReceiveId(null);
sendAll(bean);
} else if ("2".equals(type)) {//2-通知消息(接收人:单个)
bean.setTitle("消息通知");
bean.setSeeStatus("0");//默认未读
Integer adminId = bean.getReceiveId();
sendByAdminId(bean, adminId);
} else if ("3".equals(type) || "4".equals(type)) {//3-房间消息,4-不存储的消息,receiveIds必传,
bean.setLaunchId(0);
bean.setTitle("3".equals(type) ? "房间消息" : "其他消息");
bean.setSeeStatus("1");//默认已读
String[] arr = receiveIds.split(";");
if (arr == null || arr.length == 0) {
log.info("消息接口人:{}解析不到,发送执行停止", arr.toString());
return;
}
for (String receiveId : arr) {
Integer adminId = Integer.valueOf(receiveId);
sendByAdminId(bean, adminId);
}
}
}
/**
* 抽取方法:根据id和消息体发送消息
*/
public void sendByAdminId(Message message, Integer adminId) {
if (adminId == null) {
log.info("消息接收人为空,发送执行停止");
return;
}
//根据id查询接收人sessionId
Admin admin = adminMapper.selectById(adminId);
if (admin != null) {
String sessionId = admin.getSessionId();
if (StringUtils.isNotBlank(sessionId)) {
message.setReceiveId(adminId);
sendTo(message, sessionId);
}
}
}
/**
* 发送消息给指定用户
*/
public void sendTo(Message message, String sessionId) {
Session receiveSession = clientSession.get(sessionId);
if (receiveSession != null) {
try {
send(receiveSession, message, 3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
message.setSendStatus("1");//发送成功
} else {
message.setSendStatus("0");//发送失败
log.info("sessionId={}的用户不在线!", sessionId);
}
saveMessage(message);
}
/**
* 群发消息
*/
private void sendAll(Message message) {
message.setSendStatus("1");//发送成功
for (Map.Entry<String, Session> sessionEntry : clientSession.entrySet()) {
Session session = sessionEntry.getValue();
try {
send(session, message, 3000);
} catch (InterruptedException e) {
message.setSendStatus("0");//发送失败
e.printStackTrace();
}
}
saveMessage(message);
}
/**
* 保存消息
*/
private void saveMessage(Message message) {
String type = message.getType();
if (!type.equals("4")) {
messageMapper.insert(message);
}
}
/**
* 发生错误
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* 新增session
*/
public static void open(Session session) {
idle.add(session);
}
/**
* 关闭session
*/
public static void close(Session session) {
idle.remove(session);
busy.remove(session);
}
/**
* 使用session发送消息
*/
public static void send(Session session, Message message, Integer timeout) throws InterruptedException {
//timeout后放弃本次发送
if (timeout < 0) {
return;
}
//判断session是否空闲,抢占式
if (idle.remove(session)) {
busy.put(session, session.getAsyncRemote().sendText(JSON.toJSONString(message)));
} else {
//若session当前不在idle集合,则去busy集合中查看session上次是否已经发送完毕,即采用惰性判断
synchronized (busy) {
if (busy.containsKey(session) && busy.get(session).isDone()) {
busy.remove(session);
idle.add(session);
}
}
//重试
Thread.sleep(100);
send(session, message, timeout - 100);
}
}
/**
* 判断字符串是否是数字类型
*
* @param s
* @return
*/
private boolean isNumeric(String s) {
if (StringUtils.isBlank(s)) {
return s.matches("^[0-9]*$");
} else {
return false;
}
}
}
1.3 配置类
package com.mods.browser.config;
import com.mods.browser.component.AdminWebSocketServer;
import com.mods.mbg.mapper.AdminMapper;
import com.mods.mbg.mapper.MessageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class MyWebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Autowired
public void socketMessageService(AdminMapper adminMapper) {
AdminWebSocketServer.adminMapper = adminMapper;
}
@Autowired
public void socketMessageService(MessageMapper messageMapper) {
AdminWebSocketServer.messageMapper = messageMapper;
}
}
1.4 前端Vue
<template>
<i-card>
<div>
<h1>测试webSocket</h1>
<button @click="send">点击请求后台数据</button>
<button @click="onClose">关闭连接</button>
</div>
</i-card>
</template>
<script>
export default {
created() {
// 页面创建生命周期函数
this.initWebSocket()
},
methods: {
onClose() {
this.websock.close()
},
initWebSocket() {
if ('WebSocket' in window) {
// WebSocket与普通的请求所用协议有所不同,ws等同于http,wss等同于https
//1为用户admin_id,相当于我这台客户端的标识,key,用来单客户端通话
this.websock = new WebSocket('ws://localhost:6333/admin/websocket/1')
this.websock.onopen = this.websocketonopen
this.websock.onerror = this.websocketonerror
this.websock.onmessage = this.websocketonmessage
this.websock.onclose = this.websocketclose
} else {
alert('not error 不支持websocket')
}
},
//连接回调
websocketonopen() {
console.log('WebSocket连接成功')
},
//发生错误回调
websocketonerror(e) {
console.log('WebSocket连接发生错误')
},
//接收到消息的回调函数
websocketonmessage(e) {
console.log(321312)
console.log(e.data) // console.log(e);
},
// 连接关闭时的回调函数
websocketclose(e) {
console.log('connection closed (' + e + ')')
},
send() {
this.websock.send('后台你好,我是前端')
},
},
}
</script>
<style lang="less" scoped>
</style>