springboot-websocket

林魁
2023-12-01

1.websocket使用

1.1依赖

  • 版本号在父工程中指定:
<websocket.version>1.1</websocket.version>
  • 当前使用的工程中
        <!-- websocket,需要springboot-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <version>${websocket.version}</version>
            <scope>provided</scope>
        </dependency>

1.2 component 服务

package com.mods.browser.component;

import com.alibaba.fastjson.JSON;
import com.mods.mbg.mapper.AdminMapper;
import com.mods.mbg.mapper.MessageMapper;
import com.mods.mbg.model.Admin;
import com.mods.mbg.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;

@Slf4j
@ServerEndpoint("/admin/websocket/{param}")	//前端对接地址,param传用户id
@Component
public class AdminWebSocketServer {

	//注意,不要直接用mapper,用不了。套一层service才行。这里是为了简单看
    public static MessageMapper messageMapper;

    public static AdminMapper adminMapper;

    // 记录空闲Session集合
    private static CopyOnWriteArraySet<Session> idle = new CopyOnWriteArraySet<Session>();

    // 记录正在使用中Session集合,value为Future,表示使用情况
    private static ConcurrentHashMap<Session, Future<Void>> busy = new ConcurrentHashMap<Session, Future<Void>>();

    //存放所有在线的客户端session
    private static Map<String, Session> clientSession = new ConcurrentHashMap<>();

    //存放所有在线的客户端信息
    private static Map<String, Admin> clientUser = new ConcurrentHashMap<>();

    /**
     * 客户端打开
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("param") String id) {
        Integer adminId = Integer.valueOf(id);
        String sessionId = session.getId();
        log.info("有新的客户端进来sessionId={}", sessionId);
        Admin admin = adminMapper.selectById(adminId);
        if (StringUtils.isNotBlank(sessionId) && admin != null) {
        
            //更新用户的sessionId
            
            Admin bean = new Admin();
            bean.setId(adminId);
            bean.setSessionId(sessionId);
            adminMapper.updateById(bean);
            /**
            *	注意这里不要直接用mapper的方法,可以套一层,用service,service里放mapper、直接使用服务会无法启动,具体原因我不懂
            */


            //将用户存入在线组和存入用户信息
            clientSession.put(sessionId, session);
            clientUser.put(sessionId, admin);
            //判断是否在线
            for (Map.Entry<String, Admin> adminEntry : clientUser.entrySet()) {
                Admin onlineAdmin = adminEntry.getValue();
                if (onlineAdmin.getId() == adminId) {
                    Message message = new Message();
                    message.setCreateTime(new Date());
                    message.setContent("已在其他设备登陆,强制下线!");
                    message.setType("4");//3-无需存储消息
                    message.setLaunchId(adminId);
                    message.setReceiveId(null);
                    String receiveSessionId = onlineAdmin.getSessionId();
                    sendTo(message, receiveSessionId);
                    break;
                }
            }
            //如果不在线就新增
            open(session);
            log.info("sessionId={}的用户上线!", sessionId);
        }
    }

    /**
     * 客户端关闭
     */
    @OnClose
    public void onClose(Session session) {
        String sessionId = session.getId();
        log.info("sessionId={}的用户离线", sessionId);
        //将掉线的用户移除
        clientSession.remove(sessionId);
        clientUser.remove(sessionId);
        close(session);
    }

    /**
     * 调用此方法以发送消息
     */
    @OnMessage
    public void onMessage(String content) {
        log.info("收到消息:" + content);
        Message bean = JSON.parseObject(content, Message.class);
        bean.setCreateTime(new Date());
        String type = bean.getType();
        String receiveIds = bean.getReceiveIds();
        if (StringUtils.isBlank(type)) {
            log.info("消息类型未指定,发送执行停止!");
            return;
        }
        if ("1".equals(type)) {//1-系统消息(接受人:所有人,无需指定用户)
            bean.setTitle("系统消息");
            bean.setLaunchId(0);
            bean.setSeeStatus("1");//默认已读
            bean.setReceiveId(null);
            sendAll(bean);
        } else if ("2".equals(type)) {//2-通知消息(接收人:单个)
            bean.setTitle("消息通知");
            bean.setSeeStatus("0");//默认未读
            Integer adminId = bean.getReceiveId();
            sendByAdminId(bean, adminId);
        } else if ("3".equals(type) || "4".equals(type)) {//3-房间消息,4-不存储的消息,receiveIds必传,
            bean.setLaunchId(0);
            bean.setTitle("3".equals(type) ? "房间消息" : "其他消息");
            bean.setSeeStatus("1");//默认已读
            String[] arr = receiveIds.split(";");
            if (arr == null || arr.length == 0) {
                log.info("消息接口人:{}解析不到,发送执行停止", arr.toString());
                return;
            }
            for (String receiveId : arr) {
                Integer adminId = Integer.valueOf(receiveId);
                sendByAdminId(bean, adminId);
            }
        }
    }

    /**
     * 抽取方法:根据id和消息体发送消息
     */
    public void sendByAdminId(Message message, Integer adminId) {
        if (adminId == null) {
            log.info("消息接收人为空,发送执行停止");
            return;
        }
        //根据id查询接收人sessionId
        Admin admin = adminMapper.selectById(adminId);
        if (admin != null) {
            String sessionId = admin.getSessionId();
            if (StringUtils.isNotBlank(sessionId)) {
                message.setReceiveId(adminId);
                sendTo(message, sessionId);
            }
        }
    }

    /**
     * 发送消息给指定用户
     */
    public void sendTo(Message message, String sessionId) {
        Session receiveSession = clientSession.get(sessionId);
        if (receiveSession != null) {
            try {
                send(receiveSession, message, 3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            message.setSendStatus("1");//发送成功
        } else {
            message.setSendStatus("0");//发送失败
            log.info("sessionId={}的用户不在线!", sessionId);
        }
        saveMessage(message);
    }

    /**
     * 群发消息
     */
    private void sendAll(Message message) {
        message.setSendStatus("1");//发送成功
        for (Map.Entry<String, Session> sessionEntry : clientSession.entrySet()) {
            Session session = sessionEntry.getValue();
            try {
                send(session, message, 3000);
            } catch (InterruptedException e) {
                message.setSendStatus("0");//发送失败
                e.printStackTrace();
            }
        }
        saveMessage(message);
    }

    /**
     * 保存消息
     */
    private void saveMessage(Message message) {
        String type = message.getType();
        if (!type.equals("4")) {
            messageMapper.insert(message);
        }
    }

    /**
     * 发生错误
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    /**
     * 新增session
     */
    public static void open(Session session) {
        idle.add(session);
    }

    /**
     * 关闭session
     */
    public static void close(Session session) {
        idle.remove(session);
        busy.remove(session);
    }

    /**
     * 使用session发送消息
     */
    public static void send(Session session, Message message, Integer timeout) throws InterruptedException {
        //timeout后放弃本次发送
        if (timeout < 0) {
            return;
        }
        //判断session是否空闲,抢占式
        if (idle.remove(session)) {
            busy.put(session, session.getAsyncRemote().sendText(JSON.toJSONString(message)));
        } else {
            //若session当前不在idle集合,则去busy集合中查看session上次是否已经发送完毕,即采用惰性判断
            synchronized (busy) {
                if (busy.containsKey(session) && busy.get(session).isDone()) {
                    busy.remove(session);
                    idle.add(session);
                }
            }
            //重试
            Thread.sleep(100);
            send(session, message, timeout - 100);
        }
    }

    /**
     * 判断字符串是否是数字类型
     *
     * @param s
     * @return
     */
    private boolean isNumeric(String s) {
        if (StringUtils.isBlank(s)) {
            return s.matches("^[0-9]*$");
        } else {
            return false;
        }
    }
}

1.3 配置类

package com.mods.browser.config;

import com.mods.browser.component.AdminWebSocketServer;
import com.mods.mbg.mapper.AdminMapper;
import com.mods.mbg.mapper.MessageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class MyWebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
    @Autowired
    public void socketMessageService(AdminMapper adminMapper) {
        AdminWebSocketServer.adminMapper = adminMapper;
    }
    @Autowired
    public void socketMessageService(MessageMapper messageMapper) {
        AdminWebSocketServer.messageMapper = messageMapper;
    }
}

1.4 前端Vue

<template>
  <i-card>
    <div>
      <h1>测试webSocket</h1>
      <button @click="send">点击请求后台数据</button>
      <button @click="onClose">关闭连接</button>
    </div>
  </i-card>
</template>
<script>
export default {
  created() {
    // 页面创建生命周期函数
    this.initWebSocket()
  },
  methods: {
    onClose() {
      this.websock.close()
    },
    initWebSocket() {
      if ('WebSocket' in window) {
        // WebSocket与普通的请求所用协议有所不同,ws等同于http,wss等同于https
        //1为用户admin_id,相当于我这台客户端的标识,key,用来单客户端通话
        this.websock = new WebSocket('ws://localhost:6333/admin/websocket/1')
        this.websock.onopen = this.websocketonopen
        this.websock.onerror = this.websocketonerror
        this.websock.onmessage = this.websocketonmessage
        this.websock.onclose = this.websocketclose
      } else {
        alert('not error  不支持websocket')
      }
    },
    //连接回调
    websocketonopen() {
      console.log('WebSocket连接成功')
    },
    //发生错误回调
    websocketonerror(e) {
      console.log('WebSocket连接发生错误')
    },
    //接收到消息的回调函数
    websocketonmessage(e) {
      console.log(321312)
      console.log(e.data) // console.log(e);
    },
    // 连接关闭时的回调函数
    websocketclose(e) {
      console.log('connection closed (' + e + ')')
    },
    send() {
      this.websock.send('后台你好,我是前端')
    },
  },
}
</script>
<style lang="less" scoped>
</style>

 类似资料: