当前位置: 首页 > 工具软件 > tio-webpack > 使用案例 >

【tio-websocket】4、tio-websocket-server实现自定义集群模式

万浩淼
2023-12-01

t-io 中已经实现了集群功能,基于 Redis 的发布订阅方式实现的集群。当然,我们也可以自定义集群方式,只要是可以实现发布订阅的组件,基本都可以,例如:rabbitmq,activemq等。本次咱们采用使用比较简单的 activemq

1、引入 maven 依赖

<!-- ActiveMq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

引入了 activemq 的依赖信息

2、开启 ActiveMQ

在项目启动类上加上注解 @EnableJms

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

/**
 * 开启activemq
 */
@EnableJms
@SpringBootApplication
public class TioApplication {

    public static void main(String[] args) {
        SpringApplication.run(TioApplication.class, args);
    }

}

3、ActiveMQ 配置信息

spring:
  jms:
    pub-sub-domain: true
  activemq:
    broker-url: tcp://127.0.0.1:61616
    # 用户、密码
    user: admin
    password: admin
    packages:
      trust-all: true

packages.trust-all 必须配置为 true,不然会报错

4、ActiveMQ 配置类

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Topic;

@Configuration
public class ActiveMqConfig {

    /**
     * 主题
     */
    public static final String TOPICNAME = "tio_ws_spring_boot_starter";

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(TOPICNAME);
    }
}

我们注入了 ActiveMQ 的一个主题,后面我们将基于这个主题实现集群功能

5、集群发布订阅

import com.asurplus.tio.common.activemq.ActiveMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.tio.cluster.TioClusterMessageListener;
import org.tio.cluster.TioClusterTopic;
import org.tio.cluster.TioClusterVo;

import javax.jms.Topic;

/**
 * 发布订阅实现
 */
@Component
public class MyTioClusterTopic implements TioClusterTopic {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Topic topic;

    // 存储tioClusterMessageListener实例信息,消费消息需要调用其onMessage()接口
    private TioClusterMessageListener tioClusterMessageListener;

    public void addMessageListener(TioClusterMessageListener tioClusterMessageListener) {
        this.tioClusterMessageListener = tioClusterMessageListener;
    }

    /**
     * 集群消息会从此处发送至mq
     *
     * @param tioClusterVo
     */
    @Override
    public void publish(TioClusterVo tioClusterVo) {
        jmsMessagingTemplate.convertAndSend(topic, tioClusterVo);
    }

    /**
     * 监听消息
     *
     * @param tioClusterVo
     */
    @JmsListener(destination = ActiveMqConfig.TOPICNAME)
    public void receive(TioClusterVo tioClusterVo) {
        if (tioClusterMessageListener != null) {
            // 将消费的消息发送至onMessage方法中即可
            this.tioClusterMessageListener.onMessage(ActiveMqConfig.TOPICNAME, tioClusterVo);
        }
    }
}

核心就是设置一个监听器,当收到消息的时候,发布到 activemq,被其监听到后再进行具体的业务逻辑

6、集群配置

import com.asurplus.tio.websocket.handle.MyWsMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.cluster.TioClusterConfig;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.WsServerStarter;

import java.io.IOException;

@Configuration
public class WebSocketConfig {

    @Autowired
    private MyTioClusterTopic myTioClusterTopic;
    @Autowired
    private MyWsMsgHandler myWsMsgHandler;

    /**
     * TIO-WEBSOCKET 配置信息
     */
    public static ServerTioConfig serverTioConfig;

    /**
     * 集群配置
     *
     * @return
     */
    @Bean
    public TioClusterConfig tioClusterConfig() {
        // 设置Tio集群配置,设置集群消息处理器
        TioClusterConfig clusterConfig = new TioClusterConfig(myTioClusterTopic);
        // 所有连接是否集群(同一个ip是否会分布在不同的机器上),false:不集群,默认集群
        clusterConfig.setCluster4all(true);
        // bsid是否集群(在A机器上的客户端是否可以通过bsid发消息给B机器上的客户端),false:不集群,默认集群
        clusterConfig.setCluster4bsId(true);
        // id是否集群(在A机器上的客户端是否可以通过channelId发消息给B机器上的客户端),false:不集群,默认集群
        clusterConfig.setCluster4channelId(true);
        // 群组是否集群(同一个群组是否会分布在不同的机器上),false:不集群,默认不集群
        clusterConfig.setCluster4group(true);
        // ip是否集群(同一个ip是否会分布在不同的机器上),false:不集群,默认集群
        clusterConfig.setCluster4ip(true);
        // 用户是否集群(同一个用户是否会分布在不同的机器上),false:不集群,默认集群
        clusterConfig.setCluster4user(true);
        return clusterConfig;
    }

    /**
     * 启动类配置
     *
     * @return
     * @throws IOException
     */
    @Bean
    public WsServerStarter wsServerStarter() throws IOException {
        // 设置处理器
        WsServerStarter wsServerStarter = new WsServerStarter(6789, myWsMsgHandler);
        // 获取到ServerTioConfig
        serverTioConfig = wsServerStarter.getServerTioConfig();
        // 设置超时时间
        serverTioConfig.setHeartbeatTimeout(600000);
        // 设置集群配置文件
        serverTioConfig.setTioClusterConfig(tioClusterConfig());
        // 启动
        wsServerStarter.start();
        return wsServerStarter;
    }
}

t-io 的集群是否开启,取决于 TioClusterConfig 是否配置,现在我们的集群功能就配置好了

7、打包测试

我们需要打两个包进行测试,使用两个不同的 id 分别登录两台不同的 websocket 服务器,看其是否能正常收发消息

测试过程,略…

本人亲自测试可行

如您在阅读中发现不足,欢迎留言!!!

 类似资料: