WebSocket-Demo

包兴思
2023-12-01

暂时记个Demo,后续补上理论
js:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>

    <script>
        var socket;
        if(window.WebSocket){
            socket = new WebSocket("ws://localhost:8090/websocket")
            //接收消息
            socket.onmessage = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = rt.value+"\n"+ev.data;
            }

            //感知连接开启
            socket.onopen = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = "连接成功";
            }

            //感知连接关闭
            socket.onclose = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = rt.value+"\n"+"连接关闭";
            }
        }else {
            alert("浏览器不支持WebSocket")
        }

        function send(message) {
            if(!window.socket){
                return;
            }

            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else {
                alert("连接没有开启")
            }
        }
    </script>
</head>

<body>
<form onsubmit="return false">
    <textarea name="message" style="height: 300px;width: 300px"></textarea>
    <input type="button" value="发送消息" onclick="send(this.form.message.value)">
    <textarea id="responseText" style="height: 300px;width: 300px"></textarea>
    <input type="button" value="清空内容" onclick="document.getElementById('responseText').value">

</form>
</body>
</html>

后端:
1.使用spring-boot-starter-websocket

package Demo_WebSocket.java_ws;

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Component
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

package Demo_WebSocket.java_ws;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@ServerEndpoint("/websocket/{name}")
public class JavaWebSocket {

    /**
     *  与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;

    /**
     * 标识当前连接客户端的用户名
     */
    private String name;

    /**
     *  用于存所有的连接服务的客户端,这个对象存储是安全的
     */
    private static ConcurrentHashMap<String, JavaWebSocket> webSocketSet = new ConcurrentHashMap<>();


    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "name") String name){
        this.session = session;
        this.name = name;
        // name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过name来区分
        webSocketSet.put(name,this);
        log.info("[WebSocket] 连接成功,当前连接人数为:={}",webSocketSet.size());
    }


    @OnClose
    public void OnClose(){
        webSocketSet.remove(this.name);
        log.info("[WebSocket] 退出成功,当前连接人数为:={}",webSocketSet.size());
    }

    @OnMessage
    public void OnMessage(String message){
        log.info("[WebSocket] 收到消息:{}",message);
        //判断是否需要指定发送,具体规则自定义
        if(message.indexOf("TOUSER") == 0){
            String name = message.substring(message.indexOf("TOUSER")+6,message.indexOf(";"));
            AppointSending(name,message.substring(message.indexOf(";")+1,message.length()));
        }else{
            GroupSending(message);
        }

    }

    /**
     * 群发
     * @param message
     */
    public void GroupSending(String message){
        for (String name : webSocketSet.keySet()){
            try {
                webSocketSet.get(name).session.getBasicRemote().sendText(message);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 指定发送
     * @param name
     * @param message
     */
    public void AppointSending(String name,String message){
        try {
            webSocketSet.get(name).session.getBasicRemote().sendText(message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.netty实现
package Demo_WebSocket.netty_ws;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class MyServer {
    public static void main(String[] args) {
        //处理链接与消息
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try{
            //启动器
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //流水线(处理器容器)
                            ChannelPipeline pipeline = socketChannel.pipeline();

                            //使用http编解码器
                            pipeline.addLast(new HttpServerCodec());

                            //以块的方式写
                            pipeline.addLast(new ChunkedWriteHandler());

                            //http是分段传输,此处理器进行聚合
                            pipeline.addLast(new HttpObjectAggregator(8192));

                            //将http协议升级为ws协议,保持长连接
                            pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));

                            //业务处理器
                            pipeline.addLast(new MyTextWebSocketFrameHandler());
                        }
                    });
            //启动服务
            ChannelFuture sync = b.bind(8090).sync();
            sync.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("启动异常");
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

package Demo_WebSocket.netty_ws;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext cx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息"+msg.text());

        //回复client
        cx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间"+ LocalDateTime.now()+msg.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id().asLongText()+"handlerAdded被调用");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id().asLongText()+"handlerRemoved被调用");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生:"+cause.getMessage());
        ctx.close();
    }
}

 类似资料: