当前位置: 首页 > 工具软件 > date-io > 使用案例 >

t-io初学

殷建弼
2023-12-01

1、t-io是神马?

       t-io是一个网络框架,从这一点来说是有点像 netty 的,但 t-io 为常见和网络相关的业务(如 IM、消息推送、RPC、监控)提供了近乎于现成的解决方案,即丰富的编程 API,极大减少业务层的编程难度。

2、怎么用?分为服务端和客户端,先看服务端!

① maven引入依赖

<dependency>
     <groupId>org.t-io</groupId>
     <artifactId>tio-core</artifactId>
     <version>3.0.2.v20180612-RELEASE</version>
</dependency>

② TioServer

不知道要用什么,就先造一个TioServer,它就会告诉接下来该干什么。

TioServer tioServer = new TioServer(serverGroupContext);

需要一个ServerGroupContext。还是老样子,造一个ServerGroupContext,就知道接下来的步骤。

ServerGroupContext serverGroupContext = new ServerGroupContext("tio-server", new ServerAioHandler(), new ServerAioListener());

需要一个字符串String,表示的是服务端的名称。之后是ServerAioHandler和ServerAioListener,这两个是接口,需要自己实现。

先是ServerAioHandler的实现类,看其中的方法就知道是用来编码和解码,还有就是处理业务逻辑(handler方法)

package com.xxjsll.server.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;

import java.nio.ByteBuffer;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class MServerAioHandler implements ServerAioHandler {


    private static final Logger logger = LoggerFactory.getLogger(MServerAioHandler.class);

    public Packet decode(ByteBuffer byteBuffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {

        logger.debug("inside decode...");

        if (readableLength < ServerPacket.PACKET_HEADER_LENGTH) {
            return null;
        }

        int bodyLength = byteBuffer.getInt();

        if (bodyLength < 0) {
            throw new AioDecodeException("body length[" + bodyLength + "] is invalid. romote: " + channelContext.getServerNode());
        }

        int len = bodyLength + ServerPacket.PACKET_HEADER_LENGTH;
        if (len > readableLength) {
            return null;
        } else {
            byte[] bytes = new byte[len];

            int i = 0;

            while(true){

                if(byteBuffer.remaining() == 0){
                    break;
                }
                byte b = byteBuffer.get();
                bytes[i++] = b;
            }

            ServerPacket serverPacket = new ServerPacket();
            serverPacket.setBody(bytes);
            return serverPacket;
        }
    }

    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
        logger.info("inside encode...");

        ServerPacket serverPacket = (ServerPacket) packet;

        byte[] body = serverPacket.getBody();

        int bodyLength = 0;
        if(body != null){
            bodyLength = body.length;
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(bodyLength + ServerPacket.PACKET_HEADER_LENGTH);
        byteBuffer.order(groupContext.getByteOrder());
        byteBuffer.putInt(bodyLength);

        if (body != null) {
            byteBuffer.put(body);
        }

        return byteBuffer;
    }

    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
        logger.debug("inside handler...");

        channelContext.setServerNode(new Node("127.0.0.1", ServerPacket.PORT));

        ServerPacket serverPacket = (ServerPacket) packet;

        byte[] body = serverPacket.getBody();
        if(body != null){
            String bodyStr = new String(body, "utf-8");
            ServerPacket serverPacket1 = new ServerPacket();
            serverPacket1.setBody(("receive from ["+ channelContext.getClientNode() + "]: " + bodyStr).getBytes("utf-8"));

            Tio.send(channelContext, serverPacket1);
        }
    }
}

接着就是ServerAioListener的实现类,是个监听类。可以很容易知道其中每一个方法的作用。

package com.xxjsll.server.util;

import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioListener;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class MServerAioListener implements ServerAioListener {
    public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {

    }

    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {

    }

    public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {

    }

    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {

    }

    public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {

    }

    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {

    }
}

在ServerAioHandler的实现类和ServerAioListener的实现类中,有出现一个Packet类,这就是携带数据的类,需要通过继承来打造自己的类。

package com.xxjsll.server.util;

import org.tio.core.intf.Packet;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class ServerPacket extends Packet {

    public static final Integer PACKET_HEADER_LENGTH = 4;
    public static final Integer PORT = 8999;

    byte[] body;//数据

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }
}

至此,服务端基本完成了,现在就剩下启动。

package com.xxjsll.server;

import com.xxjsll.server.util.MServerAioHandler;
import com.xxjsll.server.util.MServerAioListener;
import org.tio.server.ServerGroupContext;
import org.tio.server.TioServer;

import java.io.IOException;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class TIOServer {

    public static void main(String[] args) throws IOException {

        ServerGroupContext serverGroupContext = new ServerGroupContext("tio-server", new MServerAioHandler(), new MServerAioListener());

        TioServer server = new TioServer(serverGroupContext);

        TioServer tioServer = new TioServer(serverGroupContext);

        server.start("127.0.0.1", 8999);
    }

}

服务端完成。

③ 接着就是客户端,其实和服务端差不多。

ClientAioHandler和ClientAioListener两个接口,还有一个Packet,和服务端差不多。

- ClientAioHandler的实现类。

package com.xxjsll.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;

import java.nio.ByteBuffer;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class MClientAioHander implements ClientAioHandler {

    Logger logger = LoggerFactory.getLogger(MClientAioHander.class);

    @Override
    public Packet heartbeatPacket() {
        return new ClientPacket();
    }

    @Override
    public Packet decode(ByteBuffer byteBuffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {

        if(readableLength < ClientPacket.PACKET_HEADER_LENGTH){
            return null;
        }

        int bodyLength = byteBuffer.getInt();
        if(bodyLength < 0){
            throw new AioDecodeException("body length is invalid.romote: " + channelContext.getServerNode());
        }

        int usefulLength = ClientPacket.PACKET_HEADER_LENGTH + bodyLength;

        if(usefulLength > readableLength){
            return null;
        }else {
            ClientPacket packet = new ClientPacket();
            byte[] body = new byte[bodyLength];
            byteBuffer.get(body);
            packet.setBody(body);

            return packet;
        }
    }

    @Override
    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {

        ClientPacket clientPacket = (ClientPacket) packet;
        byte[] body = clientPacket.getBody();

        int bodyLength = 0;

        if(body != null){
            bodyLength = body.length;
        }

        int len = ClientPacket.PACKET_HEADER_LENGTH + bodyLength;

        ByteBuffer byteBuffer = ByteBuffer.allocate(len);
        byteBuffer.order(groupContext.getByteOrder());
        byteBuffer.putInt(bodyLength);

        if(body != null){
            byteBuffer.put(body);
        }

        return byteBuffer;
    }

    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {

        ClientPacket clientPacket = (ClientPacket) packet;
        byte[] body = clientPacket.getBody();

        if(body != null){

            String bodyStr = new String(body, "utf-8");
            logger.debug("客户端收到消息: " + bodyStr);
        }
    }
}

- ClientAioListener的实现类

package com.xxjsll.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class MClientAioListener implements ClientAioListener {
    Logger logger = LoggerFactory.getLogger(MClientAioListener.class);
    private static Integer count = 0;

    @Override
    public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {
        logger.info("onAfterConnected!");
    }

    @Override
    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {
        logger.info("onAfterDecoded...");
    }

    @Override
    public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {
        logger.info("onAfterReceivedBytes-------------------" + i);
    }

    @Override
    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {
        logger.info("onAfterSent...");

    }

    @Override
    public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {
        System.out.println("onAfterHandled...");

        ClientPacket clientPacket = (ClientPacket) packet;
        String resData = new String(clientPacket.getBody(), "utf-8");

        logger.info("[" + channelContext.getServerNode() + "]: " + resData);

        count++;

        ((ClientPacket) packet).setBody(("[" + channelContext.getServerNode() + "]: " + count).getBytes());

        Thread.sleep(5000);

        Tio.send(channelContext, packet);
    }

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {

        logger.error(throwable.getMessage());
        logger.info(s);

    }
}

- Packet

package com.xxjsll.util;

import org.tio.core.intf.Packet;

/**
 * @author tsinghua
 * @date 2018/6/20
 */
public class ClientPacket extends Packet {

    public static final Integer PACKET_HEADER_LENGTH = 4;

    private byte[] body;

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }
}

接着就是向服务端发送连接请求,至于处理服务端的回复则是在ClientAioListener的实现类中。

package com.xxjsll;

import com.xxjsll.util.ClientPacket;
import com.xxjsll.util.MClientAioHander;
import com.xxjsll.util.MClientAioListener;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.TioClient;
import org.tio.core.Node;
import org.tio.core.Tio;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws Exception {

        ClientGroupContext clientGroupContext = new ClientGroupContext(new MClientAioHander(), new MClientAioListener());

        TioClient tioClient = new TioClient(clientGroupContext);

        ClientChannelContext clientChannelContext = tioClient.connect(new Node("127.0.0.1", 8999));

        ClientPacket clientPacket = new ClientPacket();
        clientPacket.setBody("hello,t-tio".getBytes("utf-8"));

        Tio.send(clientChannelContext, clientPacket);
    }
}
④ 总结一下:这只是初步运用,算个helloworld。


 类似资料: