import com.fasterxml.jackson.databind.ObjectMapper;
import entity.Result;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import notice.config.ApplicationContextProvider;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ObjectMapper MAPPER=new ObjectMapper();
//从spring容器中获取消息监听器容器,处理订阅消息sysNotice
SimpleMessageListenerContainer sysNoticeContainer= (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
SimpleMessageListenerContainer userNoticeContainer= (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("userNoticeContainer");
RabbitTemplate rabbitTemplate=ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
//存放webSocket连接Map,根据用户id存放
public static ConcurrentHashMap<String, Channel>userChannelMap=new ConcurrentHashMap();
//用户请求webSocket服务端,执行的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{“userId”:"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析json数据,获取用户id
String userId = MAPPER.readTree(json).get("userId").asText();
//第一次请求的时候,需要建立WebSocket连接
Channel channel = userChannelMap.get(userId);
if(channel==null){
channel = ctx.channel();
//把连接放在容器中
userChannelMap.put(userId,channel);
}
//只需要完成新消息的提醒即可,只需要获取消息的数量
//获取rabbitMQ的消息内容,并发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
String queueName="article_subcribe_"+userId;
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
int noticeCount=0;
if(queueProperties!=null){
queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//拼接获取队列的名称
String userQueueName="article_thumbup_"+userId;
//获取rabbit的properties容器
Properties userQueueProperties = rabbitAdmin.getQueueProperties(userQueueName);
//获取消息数量
int userNoticeCount=0;
if(userQueueProperties!=null){
userNoticeCount = (int) userQueueProperties.get("QUEUE_MESSAGE_COUNT");
}
//封装返回的数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount", noticeCount);
countMap.put("userNoticeCount", userNoticeCount);
Result result = new Result( "查询成功", countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
//把消息从队列里面清空,否则MQ消息监听器会再次消费一次
if (noticeCount > 0) {
rabbitAdmin.purgeQueue(queueName, true);
}
if(userNoticeCount>0){
rabbitAdmin.purgeQueue(userQueueName,true);
}
//为用户的消息通知队列注册监听器,便于用户在线的时候,
//一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
userNoticeContainer.addQueueNames(userQueueName);
}
}