HttpVerticle.java
package com.xiaoniu.im.rest;
import com.xiaoniu.im.utils.Runner;
import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* http 服务处理除聊天以外的业务
* Created by sweet on 2017/9/26.
*/
public class HttpVerticle extends AbstractVerticle {
private static final Logger log = LoggerFactory.getLogger(HttpVerticle.class);
public static void main(String[] args) {
Runner.runExample(HttpVerticle.class);
}
private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
private static final Integer port = 8080;
private DB db;
private HTreeMap<String, String> userMap; // 保存注册用户信息
private LocalMap<String, Object> userMapCache; // 保存注册用户信息(内存共享版)
private HTreeMap<String, String> userNameAndIdMap; // 用户id - 用户名 方便快速查询
private LocalMap<String, Object> userNameAndIdMapCache; // 用户id - 用户名 方便快速查询 (内存版)
private HTreeMap<String, String> friendMap; // 保存好友关系
private LocalMap<String, Object> friendMapCache; // 保存好友关系(内存共享版)
private HTreeMap<String, String> onlineMap; // 当前登录用户
private LocalMap<String, Object> onlineMapCache; // 当前登录用户 (内存共享版)
private LocalMap<String, Object> messageMapCache; // 保存用户私密聊天记录 (内存共享版)
private LocalMap<String, Object> messageGroupMapCache; // 保存群组聊天记录 (内存共享版)
private HTreeMap<String, String> groupMap; // 群组关系
private LocalMap<String, Object> groupMapCache; // 群组关系 (内存共享版)
private EventBus eventBus;
@Override
public void start(Future<Void> future) throws Exception {
eventBus = vertx.eventBus();
initDB().setHandler(init -> {
if (init.succeeded() && init.result()) {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 用户注册
router.post("/register").consumes("application/json").handler(this::register);
// 用户登录
router.post("/login").consumes("application/json").handler(this::login);
// 用户退出 (暂定)
router.get("/logout").handler(this::logout);
// 添加好友
router.post("/api/friend").consumes("application/json").handler(this::addFriend);
// 查询 用户的好友列表
router.get("/api/friends").handler(this::queryFriends);
// 查询 私密用户聊天记录
router.get("/api/message").handler(this::queryMessage);
// 查询当前登录用户数
router.get("/api/online").handler(this::queryUsersCount);
// 查询所有注册用户
router.get("/api/users").handler(this::queryUsers);
// 新建群组
router.post("/api/group").consumes("application/json").handler(this::createGroup);
// 查询所有 Map (测试方便观察数据)
router.get("/maps").handler(this::queryAll);
vertx.createHttpServer().requestHandler(router::accept).listen(port, server -> {
if (server.succeeded()){
log.info("Http服务启动成功");
future.complete();
} else {
server.cause().printStackTrace();
log.error("Http服务启动失败," + server.cause().getMessage());
future.fail(server.cause());
}
});
vertx.exceptionHandler(ex -> {
ex.printStackTrace();
log.error("异常处理器: " + ex.getMessage());
});
} else {
future.fail(init.cause());
}
});
}
private void queryAll(RoutingContext ctx) {
JsonObject j = new JsonObject();
j.put("userMapCache", userMapCache);
j.put("userNameAndIdMapCache", userNameAndIdMapCache);
j.put("friendMapCache", friendMapCache);
j.put("onlineMapCache", onlineMapCache);
j.put("messageMapCache", messageMapCache);
j.put("groupMapCache", groupMapCache);
j.put("messageGroupMapCache", messageGroupMapCache);
resp(ctx.request(), j);
}
private void createGroup(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String groupName = bodyAsJson.getString("groupName");
String creater = bodyAsJson.getString("creater");
JsonArray members = bodyAsJson.getJsonArray("members");
if (StringUtil.isNullOrEmpty(groupName)
|| StringUtil.isNullOrEmpty(creater)
|| members == null
|| members.size() < 2) {
respError(ctx.request(), 404, null);
return;
}
Object o = userNameAndIdMapCache.get(creater);
long count = members.stream()
.filter(memberId -> userNameAndIdMapCache.get(memberId) != null)
.count();
boolean contains = members.contains(creater); // 要求成员中必须包含创建人
if (o == null || count != members.size() || !contains) {
respError(ctx.request(), 404, null);
return;
}
// TODO 检查群组中的人和创建人是否是好友关系
// TODO 创建人是否在线及细节群组中的人不在线如何处理,在线如何处理
String groupId = Utils.uuid(); // 群组ID
JsonObject body = new JsonObject();
body.put("id", groupId);
body.put("groupName", groupName);
body.put("creater", creater);
body.put("createTime", Utils.time());
body.put("members", members);
groupMap.put(groupId, body.encode());
groupMapCache.put(groupId, body);
resp(ctx.request(), body);
}
private void queryMessage(RoutingContext ctx) {
String senderName = ctx.request().getParam("sender");
String receiverName = ctx.request().getParam("receiver");
if (StringUtil.isNullOrEmpty(senderName) || StringUtil.isNullOrEmpty(receiverName)) {
respError(ctx.request(), 404, null);
return;
}
Object o = onlineMapCache.get(senderName);
if (o == null) {
respError(ctx.request(), 500, "请登录后使用");
return;
}
Object o1 = userMapCache.get(receiverName);
if (o1 == null) {
respError(ctx.request(), 500, "不存在");
return;
}
JsonObject senderJson = (JsonObject) userMapCache.get(senderName);
JsonObject receiverJson = (JsonObject) o1;
Object o2 = friendMapCache.get(senderJson.getString("id"));
if (o2 == null) {
respError(ctx.request(), 500, "不存在");
return;
}
JsonArray friends = (JsonArray) o2;
if (friends.contains(receiverJson.getString("id"))) {
String senderId = senderJson.getString("id");
String receiverId = receiverJson.getString("id");
String msgMapKey = senderId.compareTo(receiverId) < 0 ? senderId+"-"+receiverId : receiverId+"-"+senderId;
Object msgList = messageMapCache.get(msgMapKey);
if (msgList == null) {
resp(ctx.request(), new JsonObject().put("msg", new JsonArray()));
} else {
JsonArray msgArr = (JsonArray) msgList;
List<Object> sortMsgs = msgArr.stream().sorted().collect(Collectors.toList());
resp(ctx.request(), new JsonObject().put("msg", sortMsgs));
}
} else {
respError(ctx.request(), 500, "不是好友关系");
}
}
private void queryFriends(RoutingContext context) {
String login = context.request().getParam("login");
if (StringUtil.isNullOrEmpty(login)) {
respError(context.request(), 404, "用户不存在");
return;
}
Object o = onlineMapCache.get(login);
if (o == null) {
respError(context.request(), 500, "请先登录");
return;
}
JsonObject userJson = (JsonObject) userMapCache.get(login);
Object friendsObj = friendMapCache.get(userJson.getString("id"));
if (friendsObj == null) {
respError(context.request(), 500, "请至少添加一个好友");
return;
}
log.info("friendsObj --> " + friendsObj.getClass().getName());
resp(context.request(), new JsonObject().put(login, friendsObj));
}
private void queryUsers(RoutingContext ctx) {
JsonObject data = new JsonObject();
userMapCache.forEach((k, v) -> {
data.put(k, v);
});
resp(ctx.request(), data);
}
private void queryUsersCount(RoutingContext ctx) {
long count = onlineMapCache.keySet().parallelStream().count();
resp(ctx.request(), new JsonObject().put("count", count));
}
private void logout(RoutingContext ctx) {
String login = ctx.request().getParam("login");
if (StringUtil.isNullOrEmpty(login)){
respError(ctx.request(), 404, null);
return;
}
Object o = userMapCache.get(login);
if (o == null) {
respError(ctx.request(), 404, null);
return;
}
Object o1 = onlineMapCache.get(login);
if (o1 == null) {
resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
return;
} else {
onlineMapCache.remove(login);
onlineMap.remove(login);
JsonObject sendJson = new JsonObject();
sendJson.put("loginName", login);
eventBus.send("sweet-logout", sendJson, res -> {
if (res.succeeded()) {
JsonObject body = (JsonObject) res.result().body();
String code = body.getString("code");
if (code.equals("1")) {
resp(ctx.request(), new JsonObject().put("msg", "成功退出"));
} else {
resp(ctx.request(), new JsonObject().put("msg", "没有数据"));
}
} else {
respError(ctx.request(), 500, res.cause().getMessage());
}
});
}
}
private void login(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String login = bodyAsJson.getString("login");
String passwd = bodyAsJson.getString("passwd");
if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(passwd)) {
respError(ctx.request(), 404, null);
return;
}
Object userValue = userMapCache.get(login);
if (userValue == null) {
respError(ctx.request(), 404, "用户名或密码错误");
return;
}
JsonObject userJson = (JsonObject) userValue;
if (userJson.getString("passwd").equals(passwd)) {
String time = Utils.time();
onlineMap.put(login, time);
onlineMapCache.put(login, time);
resp(ctx.request(), new JsonObject().put("msg", "登录成功"));
} else {
respError(ctx.request(), 404, "用户名或密码错误");
}
}
private void addFriend(RoutingContext ctx) {
JsonObject bodyAsJson = ctx.getBodyAsJson();
String login = bodyAsJson.getString("login"); // 登录用户
String addName = bodyAsJson.getString("add"); // 要添加的好友
if (StringUtil.isNullOrEmpty(login) || StringUtil.isNullOrEmpty(addName)) {
respError(ctx.request(), 404, "找不到该用户");
return;
}
Object o = userMapCache.get(login);
Object o1 = userMapCache.get(addName);
if (o == null || o1 == null) {
respError(ctx.request(), 404, "找不到该用户");
return;
}
Object o2 = onlineMapCache.get(addName); // 检查要添加的好友是否在线
if (o2 == null) {
respError(ctx.request(), 404, "该用户不在线");
return;
}
JsonObject loginUser = (JsonObject) o;
JsonObject addUser = (JsonObject) o1;
String id = loginUser.getString("id");
String addUserId = addUser.getString("id");
Object o3 = friendMapCache.get(id);
if (o3 == null) {
JsonArray arr = new JsonArray();
arr.add(addUserId);
friendMap.put(id, arr.encode());
friendMapCache.put(id, arr);
} else {
JsonArray arr = (JsonArray) o3;
if (arr.contains(addUserId)) {
respError(ctx.request(), 500, "已添加过好友");
return;
}
arr.add(addUserId);
friendMap.put(id, arr.encode());
friendMapCache.put(id, arr);
}
resp(ctx.request(), new JsonObject().put("msg", "添加成功"));
}
private void register(RoutingContext routingContext) {
JsonObject bodyAsJson = routingContext.getBodyAsJson();
log.info(bodyAsJson);
String login = bodyAsJson.getString("login");
String name = bodyAsJson.getString("name");
String passwd = bodyAsJson.getString("passwd");
if (StringUtil.isNullOrEmpty(login)
|| StringUtil.isNullOrEmpty(name)
|| StringUtil.isNullOrEmpty(passwd)) {
respError(routingContext.request(), 404,null);
return;
}
Object v = userMapCache.get(login);
if (v != null) {
respError(routingContext.request(), 405, null);
return;
}
String uuid = Utils.uuid();
JsonObject obj = new JsonObject()
.put("id", uuid)
.put("name", name)
.put("login", login)
.put("passwd", passwd)
.put("createTime", Utils.time());
userMap.put(login, obj.encode());
userMapCache.put(login, obj);
userNameAndIdMap.put(uuid, login);
userNameAndIdMapCache.put(uuid, login);
JsonObject copy = obj.copy();
copy.remove("passwd");
resp(routingContext.request(), copy);
}
private Future<Boolean> initDB() {
Future<Boolean> initDBFuture = Future.future();
try {
db = DBMaker.fileDB(filePath +"sweet-im.db").closeOnJvmShutdown().make();
// 保存注册用户信息
userMap = db.hashMap("user-db", Serializer.STRING, Serializer.STRING).createOrOpen();
userMapCache = vertx.sharedData().getLocalMap("user-db-cache");
copyJsonObj(userMap, userMapCache); // 把文件中的用户数据缓存到 内存中
// 保存好友关系
friendMap = db.hashMap("friend-db", Serializer.STRING, Serializer.STRING).createOrOpen();
friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");
copyJsonArray(friendMap, friendMapCache);
// 当前登录用户
onlineMap = db.hashMap("online-db", Serializer.STRING, Serializer.STRING).createOrOpen();
onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");
copyString(onlineMap, onlineMapCache);
// 私密聊天的消息记录
messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");
// 保存群组聊天记录
messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");
// 群组关系
groupMap = db.hashMap("group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");
copyJsonObj(groupMap, groupMapCache);
// 用户名 - 用户id
userNameAndIdMap = db.hashMap("username-id-db", Serializer.STRING, Serializer.STRING).createOrOpen();
userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");
copyString(userNameAndIdMap, userNameAndIdMapCache);
initDBFuture.complete(true);
} catch (Exception e) {
e.printStackTrace();
initDBFuture.fail(e.getCause());
}
return initDBFuture;
}
private void copyJsonObj(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonObject(v)));
}
private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
}
private void copyString(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, v));
}
private static void resp(HttpServerRequest request, JsonObject ret) {
request.response()
.putHeader("content-type", "application/json;charset=utf-8")
.putHeader("Access-Control-Allow-Origin", "*")
.putHeader("Access-Control-Allow-Credentials", "true")
.putHeader("Content-Disposition", "attachment")
.end(Json.encodePrettily(ret));
}
private static void respError(HttpServerRequest request, int code, String error) {
request.response()
.putHeader("content-type", "application/json;charset=utf-8")
.putHeader("Access-Control-Allow-Origin", "*")
.putHeader("Access-Control-Allow-Credentials", "true")
.putHeader("Content-Disposition", "attachment")
.setStatusCode(code)
.end(Json.encodePrettily(new JsonObject().put("error", error)));
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
userMap.close();
friendMap.close();
onlineMap.close();
groupMap.close();
userNameAndIdMap.close();
if (!db.isClosed()) {
db.commit();
db.close();
}
stopFuture.complete();
}
}
复制代码
SocketVerticle.java
package com.xiaoniu.im.socket;
import com.xiaoniu.im.utils.Utils;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* socket 通信模块 主要处理聊天
* Created by sweet on 2017/9/26.
*/
public class SocketVerticle extends AbstractVerticle {
private static final Logger log = LoggerFactory.getLogger(SocketVerticle.class);
private static final String filePath = "C:\\xiaoniu_doc\\vertx\\sweet-im\\";
private static final Integer port = 8081;
private DB db;
private HTreeMap<String, String> messageMap; // 保存用户私密聊天记录
private LocalMap<String, Object> messageMapCache; // 保存用户私密聊天记录 (内存共享版)
private HTreeMap<String, String> messageGroupMap; // 保存群组聊天记录
private LocalMap<String, Object> messageGroupMapCache; // 保存群组聊天记录 (内存共享版)
private LocalMap<String, Object> userMapCache; // 保存注册用户信息(内存共享版)
private LocalMap<String, Object> friendMapCache; // 保存好友关系(内存共享版)
private LocalMap<String, Object> onlineMapCache; // 当前登录用户 (内存共享版)
private LocalMap<String, Object> userNameAndIdMapCache; // 用户id - 用户名 方便快速查询 (内存版)
private LocalMap<String, Object> groupMapCache; // 群组关系 (内存共享版)
private Map<String, ServerWebSocket> socketMap; // 每个用户对应一个 socket连接
private EventBus eventBus; // 处理用户退出,关闭并删除 socket 引用
@Override
public void start(Future<Void> future) throws Exception {
eventBus = vertx.eventBus();
initDB().setHandler(res -> {
vertx.createHttpServer().websocketHandler(serverWebSocket -> {
String path = serverWebSocket.path();
String query = serverWebSocket.query();
log.info("path: " + path + ", socket id: " + serverWebSocket.textHandlerID());
log.info("query: " + query);
if (!"/socket".equals(path) || StringUtil.isNullOrEmpty(query) || !query.startsWith("id=")) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
String userId = query.substring(3, query.length()); // 当前登录用户的ID
if (StringUtil.isNullOrEmpty(userId)) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
Object o = userNameAndIdMapCache.get(userId); // 判断用户是否是注册用户 o = 用户name
if (o == null || StringUtil.isNullOrEmpty(o.toString())) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
Object o1 = onlineMapCache.get(o.toString()); // 判断用户是否在线
if (o1 == null) {
serverWebSocket.reject();
serverWebSocket.close();
return;
}
// TODO 用户刷新如何处理 Socket连接 ?
put(socketMap, userId, serverWebSocket);
serverWebSocket.handler(buffer -> {
System.out.println("-------------Message------------");
System.out.println("收到的buffer : " + buffer);
System.out.println("-------------Message------------");
JsonObject jsonObject = buffer.toJsonObject();
// 好友之间聊天 所需字段
String to = jsonObject.getString("to"); // 接受人的名字
String from = jsonObject.getString("from"); // 发送人的名字
String msg = jsonObject.getString("msg");
// 群组之间聊天 所需字段
String groupId = jsonObject.getString("groupId");
String fromForGroup = jsonObject.getString("from");
if (StringUtil.isNullOrEmpty(from) || !onlineMapCache.containsKey(from) || !from.equals(o)) {
serverWebSocket.writeTextMessage("字段不能为空"); // 缺少字段 和 发送人没有登录
return;
}
// 好友之间聊天 start ===========================================
// 好友之间聊天 TODO 全部暂定只能发送在线用户
if (!StringUtil.isNullOrEmpty(to) && msg != null && onlineMapCache.containsKey(to)) {
Object o4 = friendMapCache.get(userId);
if (o4 == null) {
serverWebSocket.writeTextMessage("你还没有好友"); // 缺少字段 和 发送人没有登录
return;
}
JsonArray fromFriends = (JsonArray) o4; // 发送人的好友
Object o2 = userMapCache.get(to);
JsonObject toUserJson = (JsonObject) o2;
Object o5 = friendMapCache.get(toUserJson.getString("id"));
if (o5 == null) {
serverWebSocket.writeTextMessage("你俩不是好友关系"); // 缺少字段 和 发送人没有登录
return;
}
JsonArray toFriends = (JsonArray) o5;
if (fromFriends.contains(toUserJson.getString("id")) && toFriends.contains(userId)) { // 确定双方好友关系
String toUserId = toUserJson.getString("id"); // 接收人ID
ServerWebSocket toUserServerWebSocket = socketMap.get(toUserId); // TODO 暂时不做判断 是否在线,不在线如何处理
String msgMapKey = toUserId.compareTo(userId) < 0 ? toUserId+"-"+userId : userId+"-"+toUserId;
String msgValue = Utils.time()+"-"+from+"-"+msg;
if (messageMapCache.containsKey(msgMapKey)) {
Object o3 = messageMapCache.get(msgMapKey);
JsonArray msgArr = (JsonArray) o3;
msgArr.add(msgValue);
messageMap.put(msgMapKey, msgArr.encode());
messageMapCache.put(msgMapKey, msgArr);
} else {
JsonArray jsonArray = new JsonArray();
jsonArray.add(msgValue);
messageMap.put(msgMapKey, jsonArray.encode());
messageMapCache.put(msgMapKey, jsonArray);
}
toUserServerWebSocket.writeTextMessage(msg);
return;
} else {
serverWebSocket.writeTextMessage("你俩不是好友关系2"); // 缺少字段 和 发送人没有登录
return;
}// 好友之间聊天 end ===========================================
// 群组之间聊天 start ===========================================
} else if (!StringUtil.isNullOrEmpty(groupId)
&& !StringUtil.isNullOrEmpty(fromForGroup)
&& msg != null && groupMapCache.get(groupId) != null) {
log.info("==================群组聊天==================");
Object o2 = groupMapCache.get(groupId);
JsonObject groupJsonObj = (JsonObject) o2;
if (onlineMapCache.containsKey(fromForGroup)) {
JsonArray members = groupJsonObj.getJsonArray("members"); // 群组成员
JsonObject fromUserJson = (JsonObject) userMapCache.get(fromForGroup);
if (members.contains(fromUserJson.getString("id"))) {
sendGroupMessage(groupId, fromUserJson.getString("login"),
fromUserJson.getString("id"), members, msg); // 给全部群组成员发送消息
} else {
serverWebSocket.writeTextMessage("你不是群组成员");
}
return;
} else {
serverWebSocket.writeTextMessage("请先登录");
return;
} // 群组之间聊天 end ===========================================
} else {
// 其他不符合情况
serverWebSocket.writeTextMessage("字段不能为空!!!");
return;
}
});
// 异常处理
serverWebSocket.exceptionHandler(ex -> {
ex.printStackTrace();
log.error(ex.getMessage());
});
}).listen(port, server -> {
if (server.succeeded()) {
log.info("socket server 启动成功 端口8081");
future.complete();
} else {
log.error(server.cause().getMessage());
server.cause().printStackTrace();
future.fail(server.cause());
}
});
vertx.setPeriodic(10000, timer -> {
System.out.println("------------Socket Map Start---------");
socketMap.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
System.out.println("------------Socket Map End ---------");
});
// vertx.setPeriodic(10000, timer -> {
// System.out.println("----------Message Map Start-----------");
// messageMapCache.forEach((k, v) -> System.out.println("k: " + k + ", v: " + v));
// System.out.println("----------Message Map End -----------");
// });
// 接受用户退出消息,然后处理 SocketMap中的引用
eventBus.consumer("sweet-logout", msg -> {
JsonObject body = (JsonObject) msg.body();
String loginName = body.getString("loginName");
ServerWebSocket serverWebSocket = socketMap.get(loginName);
JsonObject replyMsg = new JsonObject();
if (serverWebSocket != null) {
serverWebSocket.close();
socketMap.remove(loginName);
replyMsg.put("code", "1"); // code 1 退出成功
msg.reply(replyMsg);
} else {
replyMsg.put("code", "0"); // code 0 没有数据
msg.reply(replyMsg);
}
});
});
}
/**
* @param groupId 群组id
* @param fromUserLogin 发送人login
* @param fromUserId 发送人Id
* @param members 群组成员 id
* @param msg 消息
*/
private void sendGroupMessage(String groupId, String fromUserLogin, String fromUserId,
JsonArray members, String msg) {
String msgValue = Utils.time()+"-"+fromUserLogin+"-"+msg; // 要保存的聊天记录
JsonArray copy = members.copy();
copy.remove(fromUserId);
if (messageGroupMapCache.containsKey(groupId)) {
Object o = messageGroupMapCache.get(groupId);
JsonArray messageGroupArr = (JsonArray) o; // 群组聊天记录
messageGroupArr.add(msgValue);
messageGroupMap.put(groupId, messageGroupArr.encode());
messageGroupMapCache.put(groupId, messageGroupArr);
} else {
JsonArray msgArr = new JsonArray();
msgArr.add(msgValue);
messageGroupMap.put(groupId, msgArr.encode());
messageGroupMapCache.put(groupId, msgArr);
}
copy.forEach(memberId -> {
System.out.println("群发消息 " + Utils.time());
ServerWebSocket serverWebSocket = socketMap.get(memberId); // TODO 没有处理不在线的情况,方便测试默认都在线
serverWebSocket.writeTextMessage(msg);
});
}
private void put(Map<String, ServerWebSocket> socketMap, String userId, ServerWebSocket serverWebSocket) {
try {
if (socketMap.containsKey(userId)) {
log.error(" ********* 连接存在,清除旧连接 ********* ");
ServerWebSocket serverWebSocket1 = socketMap.get(userId);
serverWebSocket1.close();
socketMap.remove(userId);
} else {
socketMap.put(userId, serverWebSocket);
}
} catch (Exception e) {
log.error("异常捕获, " + e.getMessage());
socketMap.put(userId, serverWebSocket);
}
}
private Future<Boolean> initDB() {
Future<Boolean> initDBFuture = Future.future();
try {
// 保存用户私密聊天记录
db = DBMaker.fileDB(filePath + "sweet-msg-im.db").closeOnJvmShutdown().make();
messageMap = db.hashMap("message-db", Serializer.STRING, Serializer.STRING).createOrOpen();
messageMapCache = vertx.sharedData().getLocalMap("message-db-cache");
copyJsonArray(messageMap, messageMapCache);
// 保存群组聊天记录
messageGroupMap = db.hashMap("message-group-db", Serializer.STRING, Serializer.STRING).createOrOpen();
messageGroupMapCache = vertx.sharedData().getLocalMap("message-group-db-cache");
copyJsonArray(messageGroupMap, messageGroupMapCache);
// 保存注册用户信息
userMapCache = vertx.sharedData().getLocalMap("user-db-cache");
// 保存好友关系
friendMapCache = vertx.sharedData().getLocalMap("friend-db-cache");
// 当前登录用户
onlineMapCache = vertx.sharedData().getLocalMap("online-db-cache");
// 用户名 - 用户id
userNameAndIdMapCache = vertx.sharedData().getLocalMap("username-id-db-cache");
// 群组关系
groupMapCache = vertx.sharedData().getLocalMap("group-db-cache");
socketMap = new ConcurrentHashMap<>();
initDBFuture.complete(true);
} catch (Exception e) {
e.printStackTrace();
initDBFuture.fail(e.getCause());
}
return initDBFuture;
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
messageMap.close();
messageGroupMap.close();
if (!db.isClosed()) {
db.commit();
db.close();
}
stopFuture.complete();
}
private void copyJsonArray(Map<String, String> sourceMap, LocalMap<String, Object> targetMap) {
sourceMap.forEach((k, v) -> targetMap.put(k, new JsonArray(v)));
}
}
复制代码
BootVerticle.java
package com.xiaoniu.im;
import com.xiaoniu.im.rest.HttpVerticle;
import com.xiaoniu.im.socket.SocketVerticle;
import com.xiaoniu.im.utils.Runner;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
/**
* Verticle 启动类
* Created by sweet on 2017/9/26.
*/
public class BootVerticle extends AbstractVerticle{
private static final Logger log = LoggerFactory.getLogger(BootVerticle.class);
public static void main(String[] args) {
Runner.runExample(BootVerticle.class);
}
@Override
public void start(Future<Void> startFuture) throws Exception {
Future<String> future = Future.future();
Future<String> future1 = Future.future();
vertx.deployVerticle(new HttpVerticle(), future1);
future1.setHandler(res -> {
if (res.succeeded()) {
vertx.deployVerticle(new SocketVerticle(), future);
} else {
startFuture.fail(res.cause());
return;
}
});
future.setHandler(handler -> {
if (handler.succeeded()) {
startFuture.complete();
log.info("全部部署 OK");
} else {
startFuture.fail(handler.cause());
}
});
}
}
复制代码
使用MapDB存储,需要修改代码里的文件存储路径,代码写的没那么精致,没有使用配置文件,源码里有readme,写了如何使用 源码地址 链接:http://pan.baidu.com/s/1o8ysUQQ 密码:5crm