当前位置: 首页 > 工具软件 > bi-bind > 使用案例 >

【3】netty4源码分析-bind

欧阳高昂
2023-12-01

转自 http://xw-z1985.iteye.com/blog/1924124

在前一篇文章中分析了监听套接字ServerSocketChannel的创建过程,本文接着分析绑定IP和端口的过程。

回到之前未分析完的doBind逻辑,前一篇文章已分析到dobind方法中initAndRegister方法,该方法最终触发了对regPromise 的listener的回调,Listener将bind任务加到boss线程的任务队列中

//AbstractBootstrap  
private ChannelFuture AbstractBootstrap doBind(final SocketAddress localAddress) {  
        final ChannelFuture regPromise = initAndRegister();  
        final Channel channel = regPromise.channel();  
        final ChannelPromise promise = channel.newPromise();  
        if (regPromise.isDone()) {  
            doBind0(regPromise, channel, localAddress, promise);  
        } else {  
            regPromise.addListener(new ChannelFutureListener() {  
                @Override  
                public void operationComplete(ChannelFuture future) throws Exception {  
                    doBind0(future, channel, localAddress, promise);  
                }  
            });  
        }  
        return promise;  
}  
  
private static void doBind0(  
            final ChannelFuture regFuture, final Channel channel,  
            final SocketAddress localAddress, final ChannelPromise promise) {  
  
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up  
        // the pipeline in its channelRegistered() implementation.  
  
        channel.eventLoop().execute(new Runnable() {  
            @Override  
            public void run() {  
                if (regFuture.isSuccess()) {  
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
                } else {  
                    promise.setFailure(regFuture.cause());  
                }  
            }  
        });  
    }  

本文就来分析bind任务

channel.bind(localAddress, promise)调用AbstractChannel的bind方法

//AbstractChannel  
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {  
        return pipeline.bind(localAddress, promise);  
    }  

pipeline.bind(localAddress, promise)调用的是DefaultChannelPipeline的方法

//DefaultChannelPipeline  
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {  
        return tail.bind(localAddress, promise);  
    }  

tail.bind(localAddress, promise)会调用DefaultChannelHandlerContext的方法

//DefaultChannelHandlerContext  
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {  
        if (localAddress == null) {  
            throw new NullPointerException("localAddress");  
        }  
        validatePromise(promise, false);  
        return findContextOutbound().invokeBind(localAddress, promise);  
}  
private DefaultChannelHandlerContext findContextOutbound() {  
        DefaultChannelHandlerContext ctx = this;  
        do {  
            ctx = ctx.prev;  
        } while (!(ctx.handler() instanceof ChannelOutboundHandler));  
        return ctx;  
    }  

bind是一个Outbound事件,因此会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail-> ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeBind方法

// DefaultChannelHandlerContext  
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {  
        EventExecutor executor = executor();  
        if (executor.inEventLoop()) {  
            invokeBind0(localAddress, promise);  
        } else {  
            executor.execute(new Runnable() {  
                @Override  
                public void run() {  
                    invokeBind0(localAddress, promise);  
                }  
            });  
        }  
        return promise;  
}  
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {  
        try {  
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);  
        } catch (Throwable t) {  
            notifyOutboundHandlerException(t, promise);  
        }  
    }  

((ChannelOutboundHandler) handler()).bind(this, localAddress, promise)这行代码会调用Headhandler的bind方法

//Headhandler  
public void bind(  
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)  
                throws Exception {  
            unsafe.bind(localAddress, promise);  
        }  

而headHandler会调用AbstractUnsafe的bind方法

//AbstractUnsafe  
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {  
            if (!ensureOpen(promise)) {  
                return;  
            }  
  
            try {  
                boolean wasActive = isActive();  
  
                // See: https://github.com/netty/netty/issues/576  
                if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&  
                    Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&  
                    localAddress instanceof InetSocketAddress &&  
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {  
                    // Warn a user about the fact that a non-root user can't receive a  
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.  
                    logger.warn(  
                            "A non-root user can't receive a broadcast packet if the socket " +  
                            "is not bound to a wildcard address; binding to a non-wildcard " +  
                            "address (" + localAddress + ") anyway as requested.");  
                }  
  
                doBind(localAddress);  
                promise.setSuccess();  
                if (!wasActive && isActive()) {  
                invokeLater(new Runnable() {  
                    @Override  
                    public void run() {  
                        pipeline.fireChannelActive();  
                    }  
                });  
            }  
            } catch (Throwable t) {  
                promise.setFailure(t);  
                closeIfClosed();  
            }  
        }  

因为AbstractUnsafe是AbstractChannel的内部类,所以doBind(localAddress)调用的就是AbstractChannel的子类NioServerSocketChannel的方法

//NioServerSocketChannel  
protected void doBind(SocketAddress localAddress) throws Exception {  
        javaChannel().socket().bind(localAddress, config.getBacklog());  
    }  
//AbstractUnsafe  
private void invokeLater(Runnable task) {  
            // This method is used by outbound operation implementations to trigger an inbound event later.  
            // They do not trigger an inbound event immediately because an outbound operation might have been  
            // triggered by another inbound event handler method.  If fired immediately, the call stack  
            // will look like this for example:  
            //  
            //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.  
            //   -> handlerA.ctx.close()  
            //      -> channel.unsafe.close()  
            //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet  
            //  
            // which means the execution of two inbound handler methods of the same handler overlap undesirably.  
            eventLoop().execute(task);  
        }  

终于看到熟悉的ServerSocket的bind方法的调用了吧,至此,就完成了对IP和端口的绑定。注意:此处的backlog(最大完成连接队列数)的默认值为3072。

由于此时bind已执行,所以isActive方法会返回true,然而channelActive是一个Inbound事件,所以不能由outbound操作直接触发(具体原因看上面代码的注释),需要将channelActive任务加入到boss线程的任务队列中,此时boss线程的任务队列已经执行完了bind任务,接着再执行channelActive任务。

ChannelActive是一个inbound事件,因此会按照head->tail的顺序执行Inbound处理器,目前有三个处理器:head-> ServerBootstrapAcceptor->tail, ServerBootstrapAcceptor和tail都是inbound处理器,先看一下Head的fireChannelActive方法

//DefaultChannelPipeline  
public ChannelPipeline fireChannelActive() {  
        head.fireChannelActive();  
        if (channel.config().isAutoRead()) {  
            channel.read();  
        }  
        return this;  
    }  

head.fireChannelActive()的代码如下:

// DefaultChannelHandlerContext  
public ChannelHandlerContext fireChannelActive() {  
        final DefaultChannelHandlerContext next = findContextInbound();  
        EventExecutor executor = next.executor();  
        if (executor.inEventLoop()) {  
            next.invokeChannelActive();  
        } else {  
            executor.execute(new Runnable() {  
                @Override  
                public void run() {  
                    next.invokeChannelActive();  
                }  
            });  
        }  
        return this;  
}  
private DefaultChannelHandlerContext findContextInbound() {  
        DefaultChannelHandlerContext ctx = this;  
        do {  
            ctx = ctx.next;  
        } while (!(ctx.handler() instanceof ChannelInboundHandler));  
        return ctx;  
    }  
private void invokeChannelActive() {  
        try {  
            ((ChannelInboundHandler) handler()).channelActive(this);  
        } catch (Throwable t) {  
            notifyHandlerException(t);  
        }  
    }  

ServerBootstrapAcceptor和tail的channelActive方法都没有做任何实质性的事情。最后以tailHandler的空实现结束

接着再看DefaultChannelPipeline执行完head.fireChannelActive()后,对channel.read()的执行

里面调用了abstractChannel的如下方法:

//abstractChannel  
public Channel read() {  
        pipeline.read();  
        return this;  
}  
//DefaultChannelPipeline  
public ChannelPipeline read() {  
        tail.read();  
        return this;  
}  
//DefaultChannelHandlerContext  
public ChannelHandlerContext read() {  
        findContextOutbound().invokeRead();  
        return this;  
    }  

Read是一个Outbound事件,因此findContextOutbound()会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail->ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeRead方法

private void invokeRead() {  
        EventExecutor executor = executor();  
        if (executor.inEventLoop()) {  
            invokeRead0();  
        } else {  
            Runnable task = invokeRead0Task;  
            if (task == null) {  
                invokeRead0Task = task = new Runnable() {  
                    @Override  
                    public void run() {  
                        invokeRead0();  
                    }  
                };  
            }  
            executor.execute(task);  
        }  
}  
private void invokeRead0() {  
        try {  
            ((ChannelOutboundHandler) handler()).read(this);  
        } catch (Throwable t) {  
            notifyHandlerException(t);  
        }  
    }  

((ChannelOutboundHandler) handler()).read(this)这行代码会调用Headhandler的read方法

//Headhandler  
public void read(ChannelHandlerContext ctx) {  
            unsafe.beginRead();  
        }  

unsafe.beginRead()会调用AbstractUnsafe的beginRead方法

//AbstractUnsafe  
public void beginRead() {  
            if (!isActive()) {  
                return;  
            }  
            try {  
                doBeginRead();  
            } catch (final Exception e) {  
                invokeLater(new Runnable() {  
                    @Override  
                    public void run() {  
                        pipeline.fireExceptionCaught(e);  
                    }  
                });  
                close(voidPromise());  
            }  
        }  

因为AbstractUnsafe是AbstractChannel的内部类,所以doBeginRead()调用的就是AbstractChannel的子类AbstractNioChannel的方法

//AbstractNioChannel  
protected void doBeginRead() throws Exception {  
        if (inputShutdown) {  
            return;  
        }  
  
        final SelectionKey selectionKey = this.selectionKey;  
        if (!selectionKey.isValid()) {  
            return;  
        }  
  
        final int interestOps = selectionKey.interestOps();  
        if ((interestOps & readInterestOp) == 0) {  
            selectionKey.interestOps(interestOps | readInterestOp);  
        }  
    }  

selectionKey.interestOps()的值是之前AbastractUnsafe类中的doRegister方法执行如下代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)时设置的,因此值为0。

而readInterestOp是之前创建NioServerSocketChanne时,NioServerSocketChannel类的构造函数中设置的super(null, newSocket(), SelectionKey.OP_ACCEPT),因此值为16。

selectionKey.interestOps(interestOps | readInterestOp)会将ops设置为16。

总结:

依次发生了以下事件:Bind(outbound)->channelActive(inbound)->read(outbound)。

注意:channelActive是在bind中触发的。

Boss线程的任务队列变化为:Bind任务->channelActive任务

bind任务共做了以下几件事情:

1、将监听套接字绑定IP和端口,并设置最大完成连接队列数

2、将channelActive任务加入到boss线程的任务队列中

channelActive任务做了以下事情:将selectionKey的interestOps设置为SelectionKey.OP_ACCEPT,即16

 类似资料: