当前位置: 首页 > 工具软件 > Cindy > 使用案例 >

Cindy3.x源码分析

武向文
2023-12-01

 最近看了JE上几篇nio相关的帖子,触发一些对于nio的思考。再一次深入阅读了cindy3.x的源码,并将一些个人心得记录下来。
本文主要探讨cindy与select轮询相关的设计和实现。且主要分析TCP非阻塞模式,不涉及UDP或阻塞模式。
其实几乎所有的nio框架都采用reactor模式,不同之处在于轮询与分发线程的设计。

Reactor相关的接口及实现类
Reactor
-DefaultReactor

ReactorHandler
-ChannelReactorHandler

Session
-AbstractChannelSession
  -SocketChannelSession
  -ServerSocketChannelSession
SessionAcceptor
-AbstractSessionAcceptor
  -NioBlockingSessionAcceptor
  -BlockingSessionAcceptor

DefaultReactor持有一个线程:selectThread,用于select轮询操作。

Java代码
  1. public   void  run() {  
  2.     try  {  
  3.         while  (!close) {  
  4.             beforeSelect();  
  5.             if  (close)  // after beforeSelect, close may be true   
  6.                 break ;  
  7.             try  {  
  8.                 selector.select(SELECT_TIMEOUT);  
  9.             } catch  (IOException e) {  
  10.                 log.error(e, e);  
  11.                 break ;  
  12.             }  
  13.             afterSelect();  
  14.         }  
  15.     } finally  {  
  16.         DefaultReactor.this .stop();  
  17.     }  
  18. }  
                public void run() {
                    try {
                        while (!close) {
                            beforeSelect();
                            if (close) // after beforeSelect, close may be true
                                break;
                            try {
                                selector.select(SELECT_TIMEOUT);
                            } catch (IOException e) {
                                log.error(e, e);
                                break;
                            }
                            afterSelect();
                        }
                    } finally {
                        DefaultReactor.this.stop();
                    }
                }


beforeSelect()
    select()之前对于interestSet的切换准备
afterSelect()
    根据readySet,触发相应的事件,如:OP_READ或OP_WRITE。并调用key所有attach的ReactorHandler中相应的onReable()或onWriteable()方法。

start
通常cindy的启动代码类似如下:

Java代码
  1.           
  2. SessionAcceptor acceptor = SessionFactory  
  3.                 .createSessionAcceptor(SessionType.TCP);  
  4. acceptor.setAcceptorHandler(acceptorHandler);  
  5. acceptor.start();  
        
SessionAcceptor acceptor = SessionFactory
                .createSessionAcceptor(SessionType.TCP);
acceptor.setAcceptorHandler(acceptorHandler);
acceptor.start();



而NioBlockingSessionAcceptor的start()执行过程如下:

Java代码
  1. public   synchronized   void  start() {  
  2.     if  (getAcceptorHandler() ==  null )  
  3.         throw   new  IllegalStateException( "acceptor handler is null" );  
  4.     if  (isStarted())  
  5.         return ;  
  6.   
  7.     ServerSocketChannel channel = null ;  
  8.     try  {  
  9.         channel = ServerSocketChannel.open();  
  10.         setServerSocketOptions(channel.socket());  
  11.         counter.set(0 );  
  12.         session.setChannel(channel);  
  13.         session.start().complete();  
  14.     } catch  (IOException e) {  
  15.         ChannelUtils.close(channel);  
  16.         exceptionCaught(e);  
  17.     }  
  18. }  
    public synchronized void start() {
        if (getAcceptorHandler() == null)
            throw new IllegalStateException("acceptor handler is null");
        if (isStarted())
            return;

        ServerSocketChannel channel = null;
        try {
            channel = ServerSocketChannel.open();
            setServerSocketOptions(channel.socket());
            counter.set(0);
            session.setChannel(channel);
            session.start().complete();
        } catch (IOException e) {
            ChannelUtils.close(channel);
            exceptionCaught(e);
        }
    }


NioBlockingSessionAcceptor持有一个Session对象:ServerSocketChannelSession,并 在start()中,触发session.start()方法。注意,实际上持有的是ServerSocketChannelSession的一个匿名子 类,并override了buildSession方法。这点非常重要,后面将提到。
ServerSocketChannelSession的start()代码如下(实际是其父类AbstractChannelSession的start())

Java代码
  1. public   synchronized  Future start() {  
  2.     if  (closeFuture !=  null  && !closeFuture.isCompleted())  
  3.         return   new  DefaultFuture( thisfalse );  
  4.     closeFuture = null// then call close will close   
  5.   
  6.     if  (startFuture ==  null ) {  
  7.         try  {  
  8.             doStart();  
  9.         } catch  (IOException e) {  
  10.             dispatchException(e);  
  11.             return   new  DefaultFuture( thisfalse );  
  12.         }  
  13.         startFuture = new  DefaultFuture( this );  
  14.         reactor.register(handler);  
  15.     }  
  16.     return  startFuture;  
  17. }  
    public synchronized Future start() {
        if (closeFuture != null && !closeFuture.isCompleted())
            return new DefaultFuture(this, false);
        closeFuture = null; // then call close will close

        if (startFuture == null) {
            try {
                doStart();
            } catch (IOException e) {
                dispatchException(e);
                return new DefaultFuture(this, false);
            }
            startFuture = new DefaultFuture(this);
            reactor.register(handler);
        }
        return startFuture;
    }


其中,调用了两个重要的函数:doStart()和reactor.register(handler)
doStart()负责一些session的初始化工作,例如,receiveBuffer大小的设置;ServerSocketChannel的open等。
reactor.register向Reactor注册具体的ReactorHandler,而在这里 (ServerSocketChannelSession的start()执行过程中) Reactor将ReactorHandler添加到了registerColl队列中。reactor.register的代码如下:

Java代码
  1. public   void  register(ReactorHandler handler) {  
  2.     if  (Thread.currentThread() == selectThread) {  
  3.         changeRegister(new  Attachment(handler));  
  4.     } else  {  
  5.         registerColl.offer(new  Attachment(handler));  
  6.         start(); // auto start when register   
  7.         selector.wakeup();  
  8.     }  
  9. }  
    public void register(ReactorHandler handler) {
        if (Thread.currentThread() == selectThread) {
            changeRegister(new Attachment(handler));
        } else {
            registerColl.offer(new Attachment(handler));
            start(); // auto start when register
            selector.wakeup();
        }
    }


结合DefaultReactor的分析可知,当selectThread启动以后,执行beforeSelect()时,将对相应的interestSet进行切换处理
Cindy对于interestSet的切换是比较特别的(从代码的注释来看,貌似是为了避免java nio的一个bug)。通常ServerSocketChannel在初始化时,都是直接调用register(selector,OP_ACCEPT)。
然而,DefaultReactor中的初始化代码如下:

Java代码
  1. private   void  changeRegister(Attachment attachment) {  
  2.     ReactorHandler handler = attachment.handler;  
  3.     if  (registered.containsKey(handler))  
  4.         return ;  
  5.     SelectableChannel[] channels = handler.getChannels();  
  6.     try  {  
  7.         for  ( int  i =  0 ; i < channels.length; i++) {  
  8.             SelectableChannel channel = channels[i];  
  9.             channel.configureBlocking(false );  
  10.             int  validOps = channel.validOps();  
  11.             // It's a bug of java nio, see:   
  12.             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4960791   
  13.             boolean  isConnected = (validOps & SelectionKey.OP_CONNECT) !=  0   
  14.                     && ((SocketChannel) channel).isConnected();  
  15.             channel.register(selector, isConnected ? SelectionKey.OP_READ  
  16.                     : (validOps & ~SelectionKey.OP_WRITE), attachment);  
  17.         }  
  18.         registered.put(handler, attachment);  
  19.         handler.onRegistered();  
  20.     } catch  (IOException e) {  
  21.         log.error(e, e);  
  22.         dispatchDeregistered(handler);  
  23.     }  
  24. }  
    private void changeRegister(Attachment attachment) {
        ReactorHandler handler = attachment.handler;
        if (registered.containsKey(handler))
            return;
        SelectableChannel[] channels = handler.getChannels();
        try {
            for (int i = 0; i < channels.length; i++) {
                SelectableChannel channel = channels[i];
                channel.configureBlocking(false);
                int validOps = channel.validOps();
                // It's a bug of java nio, see:
                // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4960791
                boolean isConnected = (validOps & SelectionKey.OP_CONNECT) != 0
                        && ((SocketChannel) channel).isConnected();
                channel.register(selector, isConnected ? SelectionKey.OP_READ
                        : (validOps & ~SelectionKey.OP_WRITE), attachment);
            }
            registered.put(handler, attachment);
            handler.onRegistered();
        } catch (IOException e) {
            log.error(e, e);
            dispatchDeregistered(handler);
        }
    }


如果isConnected=false,则注册除OP_Write之外的所有事件,包括:OP_READ和OP_ACCEPT。而对于isConnected=true的情况就比较疑惑了,难道不注册OP_ACCEPT或OP_CON,就可以直接read了?
此外,通过handler.onRegistered()还触发了filterChain的sessionStart()方法

Acceptor的start过程比较冗杂,概括起来主要执行以下几步

  • 创建ServerSocketChannelSession
  • 进行ServerSocketChannel open,configure等初始化操作
  • 向Reactor注册具体的ReactorHandler(ServerSocketChannelSession的getHandler方法中匿名类形式)
  • Reactor的selectThread在beforeSelect时,向selector注册OP_ACCEPT事件



accept
selectThread进行select操作后,afterSelect中,根据selectionKey的readySet,触发相应的事件。当OP_ACCEPT ready后,调用handler的onAcceptable方法。
此时的handler是ServerSocketChannelSession中定义的匿名类。其onAcceptable方法代码如下:

Java代码
  1. public   void  onAcceptable() {  
  2.     SocketChannel sc = null ;  
  3.     try  {  
  4.         while  ((sc = channel.accept()) !=  null ) {  
  5.             buildSession(sc);  
  6.         }  
  7.         // accept next   
  8.         getReactor().interest(this , Reactor.OP_ACCEPT);  
  9.     } catch  (IOException e) {  
  10.         ChannelUtils.close(sc);  
  11.         dispatchException(e);  
  12.         close();  
  13.     }  
  14. }  
            public void onAcceptable() {
                SocketChannel sc = null;
                try {
                    while ((sc = channel.accept()) != null) {
                        buildSession(sc);
                    }
                    // accept next
                    getReactor().interest(this, Reactor.OP_ACCEPT);
                } catch (IOException e) {
                    ChannelUtils.close(sc);
                    dispatchException(e);
                    close();
                }
            }


其作了两个重要的操作:获取SocketChannel,并创建session;修改该interestSet;

这里创建的session是SocketChannelSession,其中,buildSession的实现代码实际上定义在从上面讲到的NioBlockingSessionAcceptor中。代码如下:

Java代码
  1. protected   void  buildSession(SocketChannel sc) {  
  2.     counter.incrementAndGet();  
  3.     try  {  
  4.         setSocketOptions(sc.socket());  
  5.         SocketChannelSession session = new  SocketChannelSession();  
  6.         session.setChannel(sc);  
  7.         sessionAccepted(session);  
  8.     } catch  (Throwable e) {  
  9.         exceptionCaught(e);  
  10.     }  
  11. }  
        protected void buildSession(SocketChannel sc) {
            counter.incrementAndGet();
            try {
                setSocketOptions(sc.socket());
                SocketChannelSession session = new SocketChannelSession();
                session.setChannel(sc);
                sessionAccepted(session);
            } catch (Throwable e) {
                exceptionCaught(e);
            }
        }


在sessionAccept中,将调用session.start,与上次不同的是,这次start的是 SocketChannelSession。但同ServerSocketChannelSession一样,将向Reactor注册 ReactorHandler(SocketChannelSession中定义的handler匿名子类)
修改interestSet的方法与beforeSelect中的一样,具体操作定义在changeInterest中。
accept的过程概括来讲,就是创建和初始化与socket一一对应的SocketChannelSession;并修改 interestSet,准备accept下一个连接;此外,需要注意的是,只有与ServerSocketChannel对应的key才会注册 accept,换句话说,只有server才会对accept感兴趣。

read
read过程与accept类似,首先,在afterSelect中出发handler.onReadable(),代码如下:

Java代码
  1. public   void  onReadable() {  
  2.     try  {  
  3.         read();  
  4.         reactor.interest(handler, Reactor.OP_READ);  
  5.     } catch  (ClosedChannelException cce) {  
  6.         close();  
  7.     } catch  (Throwable e) {  
  8.         dispatchException(new  SessionException(e));  
  9.         close();  
  10.     }  
  11. }  
        public void onReadable() {
            try {
                read();
                reactor.interest(handler, Reactor.OP_READ);
            } catch (ClosedChannelException cce) {
                close();
            } catch (Throwable e) {
                dispatchException(new SessionException(e));
                close();
            }
        }


SocketChannel对应的key才对read感兴趣,所以handler是SocketChannelSession中定义的handler匿名子类。

Java代码
  1. protected   void  read()  throws  IOException {  
  2.     Buffer buffer = BufferFactory.allocate(getReadPacketSize());  
  3.     int  n = - 1 ;  
  4.     int  readCount =  0 ;  
  5.   
  6.     try  {  
  7.         while  ((n = buffer.read(channel)) >=  0 ) {  
  8.             if  (n ==  0 )  
  9.                 break ;  
  10.             readCount += n;  
  11.         }  
  12.     } catch  (IOException e) {  
  13.         buffer.release();  
  14.         throw  e;  
  15.     }  
  16.   
  17.     if  (readCount >  0 ) {  
  18.         buffer.flip();  
  19.         getSessionFilterChain(false ).packetReceived(  
  20.                 new  DefaultPacket(buffer, address));  
  21.     }  
  22.     if  (n <  0// Connection closed   
  23.         throw   new  ClosedChannelException();  
  24. }  
            protected void read() throws IOException {
                Buffer buffer = BufferFactory.allocate(getReadPacketSize());
                int n = -1;
                int readCount = 0;

                try {
                    while ((n = buffer.read(channel)) >= 0) {
                        if (n == 0)
                            break;
                        readCount += n;
                    }
                } catch (IOException e) {
                    buffer.release();
                    throw e;
                }

                if (readCount > 0) {
                    buffer.flip();
                    getSessionFilterChain(false).packetReceived(
                            new DefaultPacket(buffer, address));
                }
                if (n < 0) // Connection closed
                    throw new ClosedChannelException();
            }


完成数据读取操作,并通知dispatcher触发所有filterChain的packetReceive方法
read过程实际就体现了reactor模式,由selectThread监听并读取数据,再分发给worker线程。

write
write过程与accept和read类似。由Reactor出发具体handler的onWriteable方法。

Java代码
  1. public   void  onWritable() {  
  2.     try  {  
  3.         while  ( true ) {  
  4.             synchronized  (sendQueue) {  
  5.                 if  (currentSendPacket ==  null )  
  6.                     currentSendPacket = (FuturePacket) sendQueue.poll();  
  7.             }  
  8.             if  (currentSendPacket ==  null ) {  
  9.                 reactor.interest(handler, Reactor.OP_NON_WRITE);  
  10.                 return ;  
  11.             }  
  12.   
  13.             try  {  
  14.                 checkSendPacket(currentSendPacket);  
  15.             } catch  (RuntimeException e) {  
  16.                 dispatchException(e);  
  17.                 DefaultFuture future = currentSendPacket.future;  
  18.                 currentSendPacket = null ;  
  19.                 future.setSucceeded(false );  
  20.                 continue ;  
  21.             }  
  22.   
  23.             Buffer buffer = currentSendPacket.getContent();  
  24.             if  (!buffer.hasRemaining() || write(currentSendPacket)) {  
  25.                 buffer.limit(currentSendPacket.limit);  
  26.                 buffer.position(currentSendPacket.position);  
  27.                 buffer.release();  
  28.                 final  FuturePacket packet = currentSendPacket;  
  29.                 currentSendPacket = null ;  
  30.   
  31.                 // keep dispatch order   
  32.                 dispatch(new  Runnable() {  
  33.   
  34.                     public   void  run() {  
  35.                         packet.future.setSucceeded(true );  
  36.                         getSessionFilterChain(true ).packetSent(  
  37.                                 packet.getDelegate());  
  38.                         if  (packet.obj !=  null )  
  39.                             getSessionFilterChain(true ).objectSent(  
  40.                                     packet.obj);  
  41.                     }  
  42.                 });  
  43.             } else  {  
  44.                 reactor.interest(handler, Reactor.OP_WRITE);  
  45.                 return ;  
  46.             }  
  47.         }  
  48.     } catch  (ClosedChannelException cce) {  
  49.         close();  
  50.     } catch  (Throwable e) {  
  51.         dispatchException(new  SessionException(e));  
  52.         close();  
  53.     }  
  54. }  
        public void onWritable() {
            try {
                while (true) {
                    synchronized (sendQueue) {
                        if (currentSendPacket == null)
                            currentSendPacket = (FuturePacket) sendQueue.poll();
                    }
                    if (currentSendPacket == null) {
                        reactor.interest(handler, Reactor.OP_NON_WRITE);
                        return;
                    }

                    try {
                        checkSendPacket(currentSendPacket);
                    } catch (RuntimeException e) {
                        dispatchException(e);
                        DefaultFuture future = currentSendPacket.future;
                        currentSendPacket = null;
                        future.setSucceeded(false);
                        continue;
                    }

                    Buffer buffer = currentSendPacket.getContent();
                    if (!buffer.hasRemaining() || write(currentSendPacket)) {
                        buffer.limit(currentSendPacket.limit);
                        buffer.position(currentSendPacket.position);
                        buffer.release();
                        final FuturePacket packet = currentSendPacket;
                        currentSendPacket = null;

                        // keep dispatch order
                        dispatch(new Runnable() {

                            public void run() {
                                packet.future.setSucceeded(true);
                                getSessionFilterChain(true).packetSent(
                                        packet.getDelegate());
                                if (packet.obj != null)
                                    getSessionFilterChain(true).objectSent(
                                            packet.obj);
                            }
                        });
                    } else {
                        reactor.interest(handler, Reactor.OP_WRITE);
                        return;
                    }
                }
            } catch (ClosedChannelException cce) {
                close();
            } catch (Throwable e) {
                dispatchException(new SessionException(e));
                close();
            }
        }

 

Java代码
  1. protected   boolean  write(Packet packet)  throws  IOException {  
  2.     Buffer buffer = packet.getContent();  
  3.     while  ( true ) {  
  4.         int  n = buffer.write(channel);  
  5.         if  (!buffer.hasRemaining())  
  6.             return   true ;  
  7.         else   if  (n ==  0 ) {  
  8.             // have more data, but the kennel buffer   
  9.             // is full, wait next time to write   
  10.             return   false ;  
  11.         }  
  12.     }  
  13. }  
            protected boolean write(Packet packet) throws IOException {
                Buffer buffer = packet.getContent();
                while (true) {
                    int n = buffer.write(channel);
                    if (!buffer.hasRemaining())
                        return true;
                    else if (n == 0) {
                        // have more data, but the kennel buffer
                        // is full, wait next time to write
                        return false;
                    }
                }
            }


值得注意的是,向key添加OP_WRITE是在应用中调用session.send时完成的。
可见,所有的SocketChannelSession都维持了sendQueue,并在ReactorHandler的onWriteable中完成数据发送。此外,write过程中,还会触发filterChain的packetSend方法

close
当应用程序调用session.close时,将触发所有相关的close操作。代码如下:

Java代码
  1. public   synchronized  Future close()  throws  IllegalStateException {  
  2.     boolean  starting = startFuture !=  null  && !startFuture.isCompleted();  
  3.     if  (closeFuture ==  null ) {  
  4.         if  (!started && !starting) {  
  5.             closeFuture = new  DefaultFuture( thistrue );  
  6.             doClose(); // clear resource even not start   
  7.         } else  {  
  8.             closeFuture = new  DefaultFuture( this );  
  9.             reactor.deregister(handler);  
  10.         }  
  11.     }  
  12.     return  closeFuture;  
  13. }  
    public synchronized Future close() throws IllegalStateException {
        boolean starting = startFuture != null && !startFuture.isCompleted();
        if (closeFuture == null) {
            if (!started && !starting) {
                closeFuture = new DefaultFuture(this, true);
                doClose(); // clear resource even not start
            } else {
                closeFuture = new DefaultFuture(this);
                reactor.deregister(handler);
            }
        }
        return closeFuture;
    }


可见,close方法中,调用了Reactor的deregister方法。ReactorbeforeSelect()时在 dispachDeregistered中,完成相应的关闭操作。包括对key的cancel等,以及通过handler的onDeregistered 方法触发filterChain的sessionClose()。

总的来说,Cindy还是采用了Dispatcher+Worker的reactor模式。Cindy3.x在代码的设计和可读性方面较Cindy2.x有了很大的改进,但个人感觉与mina相比还是有所欠缺。特别是,采用了很多的匿名类来实现接口,影响了代码的可读性。

 类似资料: