当前位置: 首页 > 工具软件 > X-MSG-IM > 使用案例 >

Vert.x IM聊天系统设计实现

翁昊乾
2023-12-01

HttpVerticle.java

package com.xiaoniu.im.rest;

import com.xiaoniu.im.utils.Runner;
import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * http 服务处理除聊天以外的业务
 * Created by sweet on 2017/9/26.
 */
public class HttpVerticle extends AbstractVerticle {

  private static final Logger log = LoggerFactory.getLogger(HttpVerticle.class);

  public static void main(String[] args) {
    Runner.runExample(HttpVerticle.class);
  }
  private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
  private static final Integer port = 8080;

  private DB db;
  private HTreeMap<String, String> userMap; // 保存注册用户信息
  private LocalMap<String, Object> userMapCache; // 保存注册用户信息(内存共享版)

  private HTreeMap<String, String> userNameAndIdMap; // 用户id - 用户名 方便快速查询
  private LocalMap<String, Object> userNameAndIdMapCache; // 用户id - 用户名 方便快速查询 (内存版)

  private HTreeMap<String, String> friendMap; // 保存好友关系
  private LocalMap<String, Object> friendMapCache; // 保存好友关系(内存共享版)

  private HTreeMap<String, String> onlineMap; // 当前登录用户
  private LocalMap<String, Object> onlineMapCache; // 当前登录用户 (内存共享版)

  private LocalMap<String, Object> messageMapCache;  // 保存用户私密聊天记录 (内存共享版)
  private LocalMap<String, Object> messageGroupMapCache;  // 保存群组聊天记录 (内存共享版)

  private HTreeMap<String, String> groupMap; // 群组关系
  private LocalMap<String, Object> groupMapCache; // 群组关系 (内存共享版)

  private EventBus eventBus;

  @Override
  public void start(Future<Void> future) throws Exception {
    eventBus = vertx.eventBus();
    initDB().setHandler(init -> {
      if (init.succeeded() && init.result()) {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());

        // 用户注册
        router.post("/register").consumes("application/json").handler(this::register);

        // 用户登录
        router.post("/login").consumes("application/json").handler(this::login);

        // 用户退出 (暂定)
        router.get("/logout").handler(this::logout);

        // 添加好友
        router.post("/api/friend").consumes("application/json").handler(this::addFriend);

        // 查询 用户的好友列表
        router.get("/api/friends").handler(this::queryFriends);

        // 查询 私密用户聊天记录
        router.get("/api/message").handler(this::queryMessage);

        // 查询当前登录用户数
        router.get("/api/online").handler(this::queryUsersCount);

        // 查询所有注册用户
        router.get("/api/users").handler(this::queryUsers);

        // 新建群组
        router.post("/api/group").consumes("application/json").handler(this::createGroup);

        // 查询所有 Map (测试方便观察数据)
        router.get("/maps").handler(this::queryAll);

        vertx.createHttpServer().requestHandler(router::accept).listen(port, server -> {
          if (server.succeeded()){
            log.info("Http服务启动成功");
            future.complete();
          } else {
            server.cause().printStackTrace();
            log.error("Http服务启动失败," + server.cause().getMessage());
            future.fail(server.cause());
          }
        });

        vertx.exceptionHandler(ex -> {
          ex.printStackTrace();
          log.error("异常处理器: " + ex.getMessage());
        });

      } else {
        future.fail(init.cause());
      }
    });
  }

  private void queryAll(RoutingContext ctx) {
    JsonObject j = new JsonObject();
    j.put("userMapCache", userMapCache);
    j.put("userNameAndIdMapCache", userNameAndIdMapCache);
    j.put("friendMapCache", friendMapCache);
    j.put("onlineMapCache", onlineMapCache);
    j.put("messageMapCache", messageMapCache);
    j.put("groupMapCache", groupMapCache);
    j.put("messageGroupMapCache", messageGroupMapCache);
    resp(ctx.request(), j);
  }

  private void createGroup(RoutingContext ctx) {
    JsonObject bodyAsJson = ctx.getBodyAsJson();
    String groupName = bodyAsJson.getString("groupName");
    String creater = bodyAsJson.getString("creater");
    JsonArray members = bodyAsJson.getJsonArray("members");

    if (StringUtil.isNullOrEmpty(groupName)
            || StringUtil.isNullOrEmpty(creater)
            || members == null
            || members.size() < 2) {
      respError(ctx.request(), 404, null);
      return;
    }
    Object o = userNameAndIdMapCache.get(creater);
    long count = members.stream()
            .filter(memberId -> userNameAndIdMapCache.get(memberId) != null)
            .count();
    boolean contains = members.contains(creater); // 要求成员中必须包含创建人
    if (o == null || count != members.size() || !contains) {
      respError(ctx.request(), 404, null);
      return;
    }
    // TODO 检查群组中的人和创建人是否是好友关系
    // TODO 创建人是否在线及细节群组中的人不在线如何处理,在线如何处理

    String groupId = Utils.uuid(); // 群组ID
    JsonObject body = new JsonObject();
    body.put("id", groupId);
    body.put("groupName", groupName);
    body.put("creater", creater);
    body.put("createTime", Utils.time());
    body.put("members", members);
    groupMap.put(groupId, body.encode());
    groupMapCache.put(groupId, body);

    resp(ctx.request(), body);
  }

  private void queryMessage(RoutingContext ctx) {
    String senderName = ctx.request().getParam("sender");
    String receiverName = ctx.request().getParam("receiver");
    if (StringUtil.isNullOrEmpty(senderName) || StringUtil.isNullOrEmpty(receiverName)) {
      respError(ctx.request(), 404, null);
      return;
    }
    Object o = onlineMapCache.get(senderName);
    if (o == null) {
      respError(ctx.request(), 500, "请登录后使用");
      return;
    }
    Object o1 = userMapCache.get(receiverName);
    if (o1 == null) {
      respError(ctx.request(), 500, "不存在");
      return;
    }
    JsonObject senderJson = (JsonObject) userMapCache.get(senderName);
    JsonObject receiverJson = (JsonObject) o1;
    Object o2 = friendMapCache.get(senderJson.getString("id"));
    if (o2 == null) {
      respError(ctx.request(), 500, "不存在");
      return;
    }
    JsonArray friends = (JsonArray) o2;
    if (friends.contains(receiverJson.getString("id"))) {

      String senderId = senderJson.getString("id");
      String receiverId = receiverJson.getString("id");
      String msgMapKey = senderId.compareTo(receiverId) < 0 ? senderId+"-"+receiverId : receiverId+"-"+senderId;

      Object msgList = messageMapCache.get(msgMapKey);

      if (msgList == null) {
        resp(ctx.request(), new JsonObject().put("msg", new JsonArray()));
      } else {
        JsonArray msgArr = (JsonArray) msgList;
        List<Object> sortMsgs = msgArr.stream().sorted().collect(Collectors.toList());
        resp(ctx.request(), new JsonObject().put("msg", sortMsgs));
      }
    } else {
      respError(ctx.request(), 500, "不是好友关系");
    }
  }

  private void queryFriends(RoutingContext context) {
    String login = context.request().getParam("login");
    if (StringUtil.isNullOrEmpty(login)) {
      respError(context.request(), 404, "用户不存在");
      return;
    }
    Object o = onlineMapCache.get(login);
    if (o == null) {
      respError(context.request(), 500, "请先登录");
      return;
    }
    JsonObject userJson = (JsonObject) userMapCache.get(login);
    Object friendsObj = friendMapCache.get(userJson.getString("id"));

    if (friendsObj == null) {
      respError(context.request(), 500, "请至少添加一个好友");
      return;
    }
    log.info("friendsObj --> " + friendsObj.getClass().getName());
    resp(context.request(), new JsonObject().put(login, friendsObj));
  }

  private void queryUsers(RoutingContext ctx) {
    JsonObject data = new JsonObject();
    userMapCache.forEach((k, v) -> {
      data.put(k, v);
    });
    resp(ctx.request(), data);
  }

  private void queryUsersCount(RoutingContext ctx) {
    long count = onlineMapCache.keySet().parallelStream().count();
    resp(ctx.request(), new JsonObject().put("count", count));
  }

  private void logout(RoutingContext ctx) {
    String login = ctx.request().getParam("login");
    if (StringUtil.isNullOrEmpty(login)){
      respError(ctx.request(), 404, null);
      return;
    }
    Object o = userMapCache.get(login);
    if (o == null) {
      respError(ctx.request(), 404, null);
      return;
    }
    Object o1 = onlineMapCache.get(login);
    if (o1 == null) {
      resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
      return;
    } else {
      onlineMapCache.remove(login);
      onlineMap.remove(login);
      JsonObject sendJson = new JsonObject();
      sendJson.put("loginName", login);
      eventBus.send("sweet-logout", sendJson, res -> {
        if (res.succeeded()) {
          JsonObject body = (JsonObject) res.result().body();
          String code = body.getString("code");
          if (code.equals("1")) {
            resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
          } else {
            resp(ctx.request(), new JsonObject().put("msg", "没有数据"));
          }
        } else {
          respError(ctx.request(), 500, res.cause().getMessage());
        }
      });
    }
  }

  private void login(RoutingContext ctx) {
    JsonObject bodyAsJson = ctx.getBodyAsJson();
    String login = bodyAsJson.getString("login");
    String passwd = bodyAsJson.getString("passwd");
    if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(passwd)) {
      respError(ctx.request(), 404, null);
      return;
    }
    Object userValue = userMapCache.get(login);
    if (userValue == null) {
      respError(ctx.request(), 404, "用户名或密码错误");
      return;
    }
    JsonObject userJson = (JsonObject) userValue;
    if (userJson.getString("passwd").equals(passwd)) {
      String time = Utils.time();
      onlineMap.put(login, time);
      onlineMapCache.put(login, time);
      resp(ctx.request(), new JsonObject().put("msg", "登录成功"));
    } else {
      respError(ctx.request(), 404, "用户名或密码错误");
    }
  }

  private void addFriend(RoutingContext ctx) {
    JsonObject bodyAsJson = ctx.getBodyAsJson();
    String login = bodyAsJson.getString("login"); // 登录用户
    String addName = bodyAsJson.getString("add"); // 要添加的好友
    if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(addName)) {
      respError(ctx.request(), 404, "找不到该用户");
      return;
    }
    Object o = userMapCache.get(login);
    Object o1 = userMapCache.get(addName);
    if (o == null || o1 == null) {
      respError(ctx.request(), 404, "找不到该用户");
      return;
    }
    Object o2 = onlineMapCache.get(addName); // 检查要添加的好友是否在线
    if (o2 == null) {
      respError(ctx.request(), 404, "该用户不在线");
      return;
    }

    JsonObject loginUser = (JsonObject) o;
    JsonObject addUser = (JsonObject) o1;

    String id = loginUser.getString("id");
    String addUserId = addUser.getString("id");

    Object o3 = friendMapCache.get(id);
    if (o3 == null) {
      JsonArray arr = new JsonArray();
      arr.add(addUserId);
      friendMap.put(id, arr.encode());
      friendMapCache.put(id, arr);
    } else {
      JsonArray arr = (JsonArray) o3;
      if (arr.contains(addUserId)) {
        respError(ctx.request(), 500, "已添加过好友");
        return;
      }
      arr.add(addUserId);
      friendMap.put(id, arr.encode());
      friendMapCache.put(id, arr);
    }
    resp(ctx.request(), new JsonObject().put("msg", "添加成功"));
  }

  private void register(RoutingContext routingContext) {
    JsonObject bodyAsJson = routingContext.getBodyAsJson();
    log.info(bodyAsJson);
    String login = bodyAsJson.getString("login");
    String name = bodyAsJson.getString("name");
    String passwd = bodyAsJson.getString("passwd");
    if (StringUtil.isNullOrEmpty(login)
            || StringUtil.isNullOrEmpty(name)
            || StringUtil.isNullOrEmpty(passwd)) {
      respError(routingContext.request(), 404,null);
      return;
    }
    Object v = userMapCache.get(login);
    if (v != null) {
      respError(routingContext.request(), 405, null);
      return;
    }

    String uuid = Utils.uuid();
    JsonObject obj = new JsonObject()
            .put("id", uuid)
            .put("name", name)
            .put("login", login)
            .put("passwd", passwd)
            .put("createTime", Utils.time());
    userMap.put(login, obj.encode());
    userMapCache.put(login, obj);
    userNameAndIdMap.put(uuid, login);
    userNameAndIdMapCache.put(uuid, login);

    JsonObject copy = obj.copy();
    copy.remove("passwd");
    resp(routingContext.request(), copy);
  }

  private Future<Boolean> initDB() {
    Future<Boolean> initDBFuture = Future.future();
    try {
      db = DBMaker.fileDB(filePath +"sweet-im.db").closeOnJvmShutdown().make();

      // 保存注册用户信息
      userMap = db.hashMap("user-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      userMapCache = vertx.sharedData().getLocalMap("user-db-cache");
      copyJsonObj(userMap, userMapCache); // 把文件中的用户数据缓存到 内存中

      // 保存好友关系
      friendMap = db.hashMap("friend-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");
      copyJsonArray(friendMap, friendMapCache);

      // 当前登录用户
      onlineMap = db.hashMap("online-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");
      copyString(onlineMap, onlineMapCache);

      // 私密聊天的消息记录
      messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");

      // 保存群组聊天记录
      messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");

      // 群组关系
      groupMap = db.hashMap("group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");
      copyJsonObj(groupMap, groupMapCache);

      // 用户名 - 用户id
      userNameAndIdMap = db.hashMap("username-id-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");
      copyString(userNameAndIdMap, userNameAndIdMapCache);

      initDBFuture.complete(true);
    } catch (Exception e) {
      e.printStackTrace();
      initDBFuture.fail(e.getCause());
    }
    return initDBFuture;
  }

  private void copyJsonObj(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
    sourceMap.forEach((k, v) -> targetMap.put(k, new JsonObject(v)));
  }
  private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
    sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
  }
  private void copyString(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
    sourceMap.forEach((k, v) -> targetMap.put(k, v));
  }

  private static void resp(HttpServerRequest request, JsonObject ret) {
    request.response()
            .putHeader("content-type", "application/json;charset=utf-8")
            .putHeader("Access-Control-Allow-Origin", "*")
            .putHeader("Access-Control-Allow-Credentials", "true")
            .putHeader("Content-Disposition", "attachment")
            .end(Json.encodePrettily(ret));
  }

  private static void respError(HttpServerRequest request, int code, String error) {
    request.response()
            .putHeader("content-type", "application/json;charset=utf-8")
            .putHeader("Access-Control-Allow-Origin", "*")
            .putHeader("Access-Control-Allow-Credentials", "true")
            .putHeader("Content-Disposition", "attachment")
            .setStatusCode(code)
            .end(Json.encodePrettily(new JsonObject().put("error", error)));
  }

  @Override
  public void stop(Future<Void> stopFuture) throws Exception {
    userMap.close();
    friendMap.close();
    onlineMap.close();
    groupMap.close();
    userNameAndIdMap.close();

    if (!db.isClosed()) {
      db.commit();
      db.close();
    }
    stopFuture.complete();
  }
}
复制代码

SocketVerticle.java

package com.xiaoniu.im.socket;

import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * socket 通信模块 主要处理聊天
 * Created by sweet on 2017/9/26.
 */
public class SocketVerticle extends AbstractVerticle {
  private static final Logger log = LoggerFactory.getLogger(SocketVerticle.class);

  private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
  private static final Integer port = 8081;

  private DB db;

  private HTreeMap<String, String> messageMap; // 保存用户私密聊天记录
  private LocalMap<String, Object> messageMapCache;  // 保存用户私密聊天记录 (内存共享版)

  private HTreeMap<String, String> messageGroupMap; // 保存群组聊天记录
  private LocalMap<String, Object> messageGroupMapCache;  // 保存群组聊天记录 (内存共享版)

  private LocalMap<String, Object> userMapCache; // 保存注册用户信息(内存共享版)

  private LocalMap<String, Object> friendMapCache; // 保存好友关系(内存共享版)

  private LocalMap<String, Object> onlineMapCache; // 当前登录用户 (内存共享版)

  private LocalMap<String, Object> userNameAndIdMapCache; // 用户id - 用户名 方便快速查询 (内存版)

  private LocalMap<String, Object> groupMapCache; // 群组关系 (内存共享版)

  private Map<String, ServerWebSocket> socketMap; // 每个用户对应一个 socket连接

  private EventBus eventBus; // 处理用户退出,关闭并删除 socket 引用

  @Override
  public void start(Future<Void> future) throws Exception {
    eventBus = vertx.eventBus();
    initDB().setHandler(res -> {
      vertx.createHttpServer().websocketHandler(serverWebSocket -> {

        String path = serverWebSocket.path();
        String query = serverWebSocket.query();
        log.info("path: " + path + ", socket id: " + serverWebSocket.textHandlerID());
        log.info("query: " + query);

        if (!"/socket".equals(path) || StringUtil.isNullOrEmpty(query) || !query.startsWith("id=")) {
          serverWebSocket.reject();
          serverWebSocket.close();
          return;
        }
        String userId = query.substring(3, query.length()); // 当前登录用户的ID
        if (StringUtil.isNullOrEmpty(userId)) {
          serverWebSocket.reject();
          serverWebSocket.close();
          return;
        }

        Object o = userNameAndIdMapCache.get(userId); // 判断用户是否是注册用户 o = 用户name
        if (o == null || StringUtil.isNullOrEmpty(o.toString())) {
          serverWebSocket.reject();
          serverWebSocket.close();
          return;
        }

        Object o1 = onlineMapCache.get(o.toString()); // 判断用户是否在线
        if (o1 == null) {
          serverWebSocket.reject();
          serverWebSocket.close();
          return;
        }
        // TODO 用户刷新如何处理 Socket连接 ?
        put(socketMap, userId, serverWebSocket);
        serverWebSocket.handler(buffer -> {
          System.out.println("-------------Message------------");
          System.out.println("收到的buffer : " + buffer);
          System.out.println("-------------Message------------");
          JsonObject jsonObject = buffer.toJsonObject();
          // 好友之间聊天 所需字段
          String to = jsonObject.getString("to"); // 接受人的名字
          String from = jsonObject.getString("from"); // 发送人的名字
          String msg = jsonObject.getString("msg");

          // 群组之间聊天 所需字段
          String groupId = jsonObject.getString("groupId");
          String fromForGroup = jsonObject.getString("from");

          if (StringUtil.isNullOrEmpty(from) || !onlineMapCache.containsKey(from) || !from.equals(o)) {
            serverWebSocket.writeTextMessage("字段不能为空"); // 缺少字段 和 发送人没有登录
            return;
          }

          // 好友之间聊天 start ===========================================
          // 好友之间聊天 TODO 全部暂定只能发送在线用户
          if (!StringUtil.isNullOrEmpty(to) && msg != null && onlineMapCache.containsKey(to)) {
            Object o4 = friendMapCache.get(userId);
            if (o4 == null) {
              serverWebSocket.writeTextMessage("你还没有好友"); // 缺少字段 和 发送人没有登录
              return;
            }
            JsonArray fromFriends = (JsonArray) o4; // 发送人的好友

            Object o2 = userMapCache.get(to);
            JsonObject toUserJson = (JsonObject) o2;

            Object o5 = friendMapCache.get(toUserJson.getString("id"));
            if (o5 == null) {
              serverWebSocket.writeTextMessage("你俩不是好友关系"); // 缺少字段 和 发送人没有登录
              return;
            }
            JsonArray toFriends = (JsonArray) o5;
            if (fromFriends.contains(toUserJson.getString("id")) && toFriends.contains(userId)) { // 确定双方好友关系
              String toUserId = toUserJson.getString("id"); // 接收人ID
              ServerWebSocket toUserServerWebSocket = socketMap.get(toUserId); // TODO 暂时不做判断 是否在线,不在线如何处理
              String msgMapKey = toUserId.compareTo(userId) < 0 ? toUserId+"-"+userId : userId+"-"+toUserId;
              String msgValue = Utils.time()+"-"+from+"-"+msg;
              if (messageMapCache.containsKey(msgMapKey)) {
                Object o3 = messageMapCache.get(msgMapKey);
                JsonArray msgArr = (JsonArray) o3;
                msgArr.add(msgValue);
                messageMap.put(msgMapKey, msgArr.encode());
                messageMapCache.put(msgMapKey, msgArr);
              } else {
                JsonArray jsonArray = new JsonArray();
                jsonArray.add(msgValue);
                messageMap.put(msgMapKey, jsonArray.encode());
                messageMapCache.put(msgMapKey, jsonArray);
              }
              toUserServerWebSocket.writeTextMessage(msg);
              return;
            } else {
              serverWebSocket.writeTextMessage("你俩不是好友关系2"); // 缺少字段 和 发送人没有登录
              return;
            }// 好友之间聊天 end ===========================================
            // 群组之间聊天 start ===========================================
          } else if (!StringUtil.isNullOrEmpty(groupId)
                  && !StringUtil.isNullOrEmpty(fromForGroup)
                  && msg != null && groupMapCache.get(groupId) != null) {
            log.info("==================群组聊天==================");
            Object o2 = groupMapCache.get(groupId);
            JsonObject groupJsonObj = (JsonObject) o2;
            if (onlineMapCache.containsKey(fromForGroup)) {
              JsonArray members = groupJsonObj.getJsonArray("members"); // 群组成员
              JsonObject fromUserJson = (JsonObject) userMapCache.get(fromForGroup);
              if (members.contains(fromUserJson.getString("id"))) {
                sendGroupMessage(groupId, fromUserJson.getString("login"),
                        fromUserJson.getString("id"), members, msg); // 给全部群组成员发送消息
              } else {
                serverWebSocket.writeTextMessage("你不是群组成员");
              }
              return;
            } else {
              serverWebSocket.writeTextMessage("请先登录");
              return;
            } // 群组之间聊天 end ===========================================
          } else {
            // 其他不符合情况
            serverWebSocket.writeTextMessage("字段不能为空!!!");
            return;
          }
        });

        // 异常处理
        serverWebSocket.exceptionHandler(ex -> {
          ex.printStackTrace();
          log.error(ex.getMessage());
        });

      }).listen(port, server -> {
        if (server.succeeded()) {
          log.info("socket server 启动成功 端口8081");
          future.complete();
        } else {
          log.error(server.cause().getMessage());
          server.cause().printStackTrace();
          future.fail(server.cause());
        }
      });

      vertx.setPeriodic(10000, timer -> {
        System.out.println("------------Socket Map Start---------");
        socketMap.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
        System.out.println("------------Socket Map End  ---------");
      });

//      vertx.setPeriodic(10000, timer -> {
//        System.out.println("----------Message Map Start-----------");
//        messageMapCache.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
//        System.out.println("----------Message Map End  -----------");
//      });

      // 接受用户退出消息,然后处理 SocketMap中的引用
      eventBus.consumer("sweet-logout", msg -> {
        JsonObject body = (JsonObject) msg.body();
        String loginName = body.getString("loginName");
        ServerWebSocket serverWebSocket = socketMap.get(loginName);
        JsonObject replyMsg = new JsonObject();
        if (serverWebSocket != null) {
          serverWebSocket.close();
          socketMap.remove(loginName);
          replyMsg.put("code", "1"); // code 1 退出成功
          msg.reply(replyMsg);
        } else {
          replyMsg.put("code", "0"); // code 0 没有数据
          msg.reply(replyMsg);
        }
      });
    });
  }

  /**
   * @param groupId 群组id
   * @param fromUserLogin 发送人login
   * @param fromUserId 发送人Id
   * @param members 群组成员 id
   * @param msg 消息
   */
  private void sendGroupMessage(String groupId, String fromUserLogin, String fromUserId,
                                JsonArray members, String msg) {
    String msgValue = Utils.time()+"-"+fromUserLogin+"-"+msg; // 要保存的聊天记录
    JsonArray copy = members.copy();
    copy.remove(fromUserId);
    if (messageGroupMapCache.containsKey(groupId)) {
      Object o = messageGroupMapCache.get(groupId);
      JsonArray messageGroupArr = (JsonArray) o; // 群组聊天记录
      messageGroupArr.add(msgValue);
      messageGroupMap.put(groupId, messageGroupArr.encode());
      messageGroupMapCache.put(groupId, messageGroupArr);
    } else {
      JsonArray msgArr = new JsonArray();
      msgArr.add(msgValue);
      messageGroupMap.put(groupId, msgArr.encode());
      messageGroupMapCache.put(groupId, msgArr);
    }

    copy.forEach(memberId -> {
      System.out.println("群发消息 " + Utils.time());
      ServerWebSocket serverWebSocket = socketMap.get(memberId); // TODO 没有处理不在线的情况,方便测试默认都在线
      serverWebSocket.writeTextMessage(msg);
    });
  }

  private void put(Map<String, ServerWebSocket> socketMap, String userId, ServerWebSocket serverWebSocket) {
    try {
      if (socketMap.containsKey(userId)) {
        log.error(" ********* 连接存在,清除旧连接 ********* ");
        ServerWebSocket serverWebSocket1 = socketMap.get(userId);
        serverWebSocket1.close();
        socketMap.remove(userId);
      } else {
        socketMap.put(userId, serverWebSocket);
      }
    } catch (Exception e) {
      log.error("异常捕获, " + e.getMessage());
      socketMap.put(userId, serverWebSocket);
    }
  }

  private Future<Boolean> initDB() {
    Future<Boolean> initDBFuture = Future.future();
    try {
      // 保存用户私密聊天记录
      db = DBMaker.fileDB(filePath + "sweet-msg-im.db").closeOnJvmShutdown().make();
      messageMap = db.hashMap("message-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");
      copyJsonArray(messageMap, messageMapCache);

      // 保存群组聊天记录
      messageGroupMap = db.hashMap("message-group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
      messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");
      copyJsonArray(messageGroupMap, messageGroupMapCache);

      // 保存注册用户信息
      userMapCache = vertx.sharedData().getLocalMap("user-db-cache");

      // 保存好友关系
      friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");

      // 当前登录用户
      onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");

      // 用户名 - 用户id
      userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");

      // 群组关系
      groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");

      socketMap = new ConcurrentHashMap<>();

      initDBFuture.complete(true);
    } catch (Exception e) {
      e.printStackTrace();
      initDBFuture.fail(e.getCause());
    }
    return initDBFuture;
  }

  @Override
  public void stop(Future<Void> stopFuture) throws Exception {
    messageMap.close();
    messageGroupMap.close();
    if (!db.isClosed()) {
      db.commit();
      db.close();
    }
    stopFuture.complete();
  }

  private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
    sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
  }
}
复制代码

BootVerticle.java

package com.xiaoniu.im;

import com.xiaoniu.im.rest.HttpVerticle;
import com.xiaoniu.im.socket.SocketVerticle;
import com.xiaoniu.im.utils.Runner;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

/**
 * Verticle 启动类
 * Created by sweet on 2017/9/26.
 */
public class BootVerticle extends AbstractVerticle{

  private static final Logger log = LoggerFactory.getLogger(BootVerticle.class);

  public static void main(String[] args) {
    Runner.runExample(BootVerticle.class);
  }

  @Override
  public void start(Future<Void> startFuture) throws Exception {
    Future<String> future = Future.future();
    Future<String> future1 = Future.future();

    vertx.deployVerticle(new HttpVerticle(), future1);
    future1.setHandler(res -> {
      if (res.succeeded()) {
        vertx.deployVerticle(new SocketVerticle(), future);
      } else {
        startFuture.fail(res.cause());
        return;
      }
    });

    future.setHandler(handler -> {
      if (handler.succeeded()) {
        startFuture.complete();
        log.info("全部部署 OK");
      } else {
        startFuture.fail(handler.cause());
      }
    });
  }
}
复制代码

使用MapDB存储,需要修改代码里的文件存储路径,代码写的没那么精致,没有使用配置文件,源码里有readme,写了如何使用 源码地址 链接:http://pan.baidu.com/s/1o8ysUQQ 密码:5crm

 类似资料: