t-io 中已经实现了集群功能,基于 Redis 的发布订阅方式实现的集群。当然,我们也可以自定义集群方式,只要是可以实现发布订阅的组件,基本都可以,例如:rabbitmq,activemq等。本次咱们采用使用比较简单的 activemq
<!-- ActiveMq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
引入了 activemq 的依赖信息
在项目启动类上加上注解 @EnableJms
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
/**
* 开启activemq
*/
@EnableJms
@SpringBootApplication
public class TioApplication {
public static void main(String[] args) {
SpringApplication.run(TioApplication.class, args);
}
}
spring:
jms:
pub-sub-domain: true
activemq:
broker-url: tcp://127.0.0.1:61616
# 用户、密码
user: admin
password: admin
packages:
trust-all: true
packages.trust-all 必须配置为 true,不然会报错
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Topic;
@Configuration
public class ActiveMqConfig {
/**
* 主题
*/
public static final String TOPICNAME = "tio_ws_spring_boot_starter";
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPICNAME);
}
}
我们注入了 ActiveMQ 的一个主题,后面我们将基于这个主题实现集群功能
import com.asurplus.tio.common.activemq.ActiveMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.tio.cluster.TioClusterMessageListener;
import org.tio.cluster.TioClusterTopic;
import org.tio.cluster.TioClusterVo;
import javax.jms.Topic;
/**
* 发布订阅实现
*/
@Component
public class MyTioClusterTopic implements TioClusterTopic {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
// 存储tioClusterMessageListener实例信息,消费消息需要调用其onMessage()接口
private TioClusterMessageListener tioClusterMessageListener;
public void addMessageListener(TioClusterMessageListener tioClusterMessageListener) {
this.tioClusterMessageListener = tioClusterMessageListener;
}
/**
* 集群消息会从此处发送至mq
*
* @param tioClusterVo
*/
@Override
public void publish(TioClusterVo tioClusterVo) {
jmsMessagingTemplate.convertAndSend(topic, tioClusterVo);
}
/**
* 监听消息
*
* @param tioClusterVo
*/
@JmsListener(destination = ActiveMqConfig.TOPICNAME)
public void receive(TioClusterVo tioClusterVo) {
if (tioClusterMessageListener != null) {
// 将消费的消息发送至onMessage方法中即可
this.tioClusterMessageListener.onMessage(ActiveMqConfig.TOPICNAME, tioClusterVo);
}
}
}
核心就是设置一个监听器,当收到消息的时候,发布到 activemq,被其监听到后再进行具体的业务逻辑
import com.asurplus.tio.websocket.handle.MyWsMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.cluster.TioClusterConfig;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.WsServerStarter;
import java.io.IOException;
@Configuration
public class WebSocketConfig {
@Autowired
private MyTioClusterTopic myTioClusterTopic;
@Autowired
private MyWsMsgHandler myWsMsgHandler;
/**
* TIO-WEBSOCKET 配置信息
*/
public static ServerTioConfig serverTioConfig;
/**
* 集群配置
*
* @return
*/
@Bean
public TioClusterConfig tioClusterConfig() {
// 设置Tio集群配置,设置集群消息处理器
TioClusterConfig clusterConfig = new TioClusterConfig(myTioClusterTopic);
// 所有连接是否集群(同一个ip是否会分布在不同的机器上),false:不集群,默认集群
clusterConfig.setCluster4all(true);
// bsid是否集群(在A机器上的客户端是否可以通过bsid发消息给B机器上的客户端),false:不集群,默认集群
clusterConfig.setCluster4bsId(true);
// id是否集群(在A机器上的客户端是否可以通过channelId发消息给B机器上的客户端),false:不集群,默认集群
clusterConfig.setCluster4channelId(true);
// 群组是否集群(同一个群组是否会分布在不同的机器上),false:不集群,默认不集群
clusterConfig.setCluster4group(true);
// ip是否集群(同一个ip是否会分布在不同的机器上),false:不集群,默认集群
clusterConfig.setCluster4ip(true);
// 用户是否集群(同一个用户是否会分布在不同的机器上),false:不集群,默认集群
clusterConfig.setCluster4user(true);
return clusterConfig;
}
/**
* 启动类配置
*
* @return
* @throws IOException
*/
@Bean
public WsServerStarter wsServerStarter() throws IOException {
// 设置处理器
WsServerStarter wsServerStarter = new WsServerStarter(6789, myWsMsgHandler);
// 获取到ServerTioConfig
serverTioConfig = wsServerStarter.getServerTioConfig();
// 设置超时时间
serverTioConfig.setHeartbeatTimeout(600000);
// 设置集群配置文件
serverTioConfig.setTioClusterConfig(tioClusterConfig());
// 启动
wsServerStarter.start();
return wsServerStarter;
}
}
t-io 的集群是否开启,取决于 TioClusterConfig 是否配置,现在我们的集群功能就配置好了
我们需要打两个包进行测试,使用两个不同的 id 分别登录两台不同的 websocket 服务器,看其是否能正常收发消息
测试过程,略…
本人亲自测试可行
如您在阅读中发现不足,欢迎留言!!!