博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第一章 Netty深入剖析之Netty服务端启动
阅读量:2062 次
发布时间:2019-04-29

本文共 15331 字,大约阅读时间需要 51 分钟。

最近一段时间在学习netty,但是不知道原理用起来总是觉得很懵逼,现在一系列的文章都是在大咖的讲解下结合我自己的理解总结出的一系列分析。汇总下来,有两个目的,一个目的是方便自己日后回顾快速复习,一是希望能对其他小伙伴产生帮助,如果是这样的话,也是级感欣慰,话不多说我们步入正题。

今天这一话的基本目录如下:

目录


 我们建立在一个基本服务端代码上做分析,代码如下:

public final class Server {    public static void main(String[] args) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childOption(ChannelOption.TCP_NODELAY, true)                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")                    .handler(new ServerHandler())                    .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AuthHandler()); //.. } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

服务端channel的创建

服务端channel的创建的基本过程,我们从ServerBootstrap的bind方法开始分析,一直往下走直到doBind方法,我们跟进initAndRegister,此处通过channelFactory对象创建channel,现在我们知道是谁以及在哪里创建这个channel了,就转化为了这个工厂是如何创建channel的问题了,通过断点我们知道这个工厂的类为ReflectiveChannelFactory,由于该工厂实现简单,我们全部粘贴出来我们看到实际创建对象就是直接使用反射class.newInstance创建对象的,那么我们现在的问题就转化为了这个class是什么以及何时实例化该工厂并传递clazz的,这个工厂是在哪里创建的并且clazz是什么类型的呢?我们知道创建的channel为NioServerSocketChannel.class,而且我们是在ServerBootstrap.channel(NioServerSocketChannel.class)的时候传递的,那我们进去看看这个方法,我们看到这个方法实例化了我们需要的ChannelFactory,并将NioServerSocketChannel作为参数传递进去了,而channelFactory()方法则是直接将我们实例化的工厂直接赋值给了成员变量.。现在我们知道了channel是在哪里以及如何创建的了,那我们现在进入NioServerSocketChannel的构造函数中去看看channel,我们重点看一下newSocket方法做了什么操作,我们看到下面的代码,创建了一个jdk的channel,到此为止我们channel的创建过程就算是介绍完了。

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            //此处通过channelFactory对象创建channel,现在我们知道是谁以及在哪里创建这个channel了,就转化为了这个工厂是如何创建channel的问题了            channel = channelFactory.newChannel();            init(channel);        } catch (Throwable t) {            ....        }        return regFuture;    }

 

//由于该工厂实现简单,我们全部粘贴出来我们看到实际创建对象就是直接使用反射class.newInstance创建对象的,那么我们现在的问题就转化为了这个class是什么以及何时实例化该工厂并传递clazz的public class ReflectiveChannelFactory
implements ChannelFactory
{ private final Class
clazz; public ReflectiveChannelFactory(Class
clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; }}
public B channel(Class
channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } //我们看到这个方法实例化了我们需要的ChannelFactory,并将NioServerSocketChannel作为参数传递进去了 return channelFactory(new ReflectiveChannelFactory
(channelClass)); } public B channelFactory(ChannelFactory
channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } //此处我们将他直接复制给了上文我们用到的channelFactory this.channelFactory = channelFactory; return (B) this; }

 

public NioServerSocketChannel() {        //我们重点看一下newSocket方法做了什么操作,我们看到下面的代码,创建了一个jdk的ServerSocketChannel         this(newSocket(DEFAULT_SELECTOR_PROVIDER));    } private static ServerSocketChannel newSocket(SelectorProvider provider) {        try {            /**             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.             *             *  See #2308.             */            return provider.openServerSocketChannel();        } catch (IOException e) {            throw new ChannelException(                    "Failed to open a server socket.", e);        }    }
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        //这里我们将jdk本地的channel保存起来        this.ch = ch;        //OP_ACCEPT 保存到readInterestOp         this.readInterestOp = readInterestOp;        try {            ch.configureBlocking(false);        } catch (IOException e) {            try {                ch.close();            } catch (IOException e2) {                if (logger.isWarnEnabled()) {                    logger.warn(                            "Failed to close a partially initialized socket.", e2);                }            }            throw new ChannelException("Failed to enter non-blocking mode.", e);        }    }

 

 

服务端channel 的初始化

我们分析翻channel的创建之后再接着看channel的初始化操作,这部分基本没有难点,但是会有一段代码现在咱们暂时还看不懂,等以后我们在回过头来分析,我们现在只是先简单的粘贴出主代码来。

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel();                //此处进行channel的初始化操作            init(channel);        } catch (Throwable t) {          ...        }       ...        return regFuture;    }
void init(Channel channel) throws Exception {        final Map
, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //这段代码对我们来说有点摸不到头脑,我们以后有了更多的知识储备之后再回过头来分析他 p.addLast(new ChannelInitializer
() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

 

channel注册到selector

首先 调用nioeventloopgroup的register方法进行channel的selector 注册步骤,实际上调用的是MultithreadEventLoopGroup的register方法,这里有两处需要我们进行分析的,一个是next方法的返回值,一个是register做的操作,我们one by one 先从next 进行分析,在next方法中,我们看到最终会调用一个选择器来返回一个EventExecutor 对象,那这个EventExecutor 具体是什么呢,chooser又是怎样获取的呢?这个我们先留一个小尾巴,在我们学习了NioEventLoop之后我们继续分析,我们只需知道他现在返回的是一个NioEventLoop类型的实例。那么我们现在再来分析NioEventLoop.register()方法。实际上调用的是SingleThreadEventLoop类的register方法,进而调用SingleThreadEventLoop类的register方法,最后通过AbstractChannel类,调用 AbstractNioChannel类的doRegister进行channel的注册。如果看着流程有些模糊的话可以看一下下图的堆栈信息或自己动手调试即可。

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel();            init(channel);        } catch (Throwable t) {          ...        }        //调用nioeventloopgroup的register方法进行channel的selector 注册步骤        ChannelFuture regFuture = config().group().register(channel);       ...        return regFuture;    }
//MultithreadEventLoopGroup的register方法,这里有两处需要我们进行分析的,一个是next方法的返回值,一个是register做的操作,我们one by one 先从next 进行分析public ChannelFuture register(Channel channel) {        return next().register(channel);    }
public EventExecutor next() {//我们看到最终会调用一个选择器来返回一个EventExecutor 对象,那这个EventExecutor 具体是什么呢,chooseer又是怎样获取的呢?这个我们先留一个小尾巴,在我们学习了NioEventLoop之后我们继续分析,我们只需知道他现在返回的是一个NioEventLoop类型的实例        return chooser.next();    }

 

//调用到SingleThreadEventLoop类的register方法public ChannelFuture register(Channel channel) {        return register(new DefaultChannelPromise(channel, this));    }
// SingleThreadEventLoop类的register方法 public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        //我们重点分析这个方法        promise.channel().unsafe().register(this, promise);        return promise;    }

 

//AbstractChannel类public final void register(EventLoop eventLoop, final ChannelPromise promise) {           //首先将eventLoop赋值给成员变量            AbstractChannel.this.eventLoop = eventLoop;            //判断是否在同一个线程中            if (eventLoop.inEventLoop()) {                register0(promise);            } else {                try {                    eventLoop.execute(new Runnable() {                        @Override                        public void run() {                            register0(promise);                        }                    });                } catch (Throwable t) {                    ...                }            }        }//最后调用doRegister方法进行注册private void register0(ChannelPromise promise) {            try {                // check if the channel is still open as it could be closed in the mean time when the register                // call was outside of the eventLoop                if (!promise.setUncancellable() || !ensureOpen(promise)) {                    return;                }                boolean firstRegistration = neverRegistered;                doRegister();                neverRegistered = false;                registered = true;                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the                // user may already fire events through the pipeline in the ChannelFutureListener.            //发出相关事件                pipeline.invokeHandlerAddedIfNeeded();                safeSetSuccess(promise);发出相关事件                pipeline.fireChannelRegistered();                // Only fire a channelActive if the channel has never been registered. This prevents firing                // multiple channel actives if the channel is deregistered and re-registered.                if (isActive()) {                    if (firstRegistration) {                        pipeline.fireChannelActive();                    } else if (config().isAutoRead()) {                        // This channel was registered before and autoRead() is set. This means we need to begin read                        // again so that we process inbound data.                        //                        // See https://github.com/netty/netty/issues/4805                        beginRead();                    }                }            } catch (Throwable t) {                // Close the channel directly to avoid FD leak.                closeForcibly();                closeFuture.setClosed();                safeSetFailure(promise, t);            }        }

 

//调用 AbstractNioChannel类的doRegister进行channel的注册protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().selector, 0, this);                return;            } catch (CancelledKeyException e) {                if (!selected) {                    // Force the Selector to select now as the "canceled" SelectionKey may still be                    // cached and not removed because no Select.select(..) operation was called yet.                    eventLoop().selectNow();                    selected = true;                } else {                    // We forced a select operation on the selector before but the SelectionKey is still cached                    // for whatever reason. JDK bug ?                    throw e;                }            }        }    }
doRegister:384, AbstractNioChannel (io.netty.channel.nio)register0:500, AbstractChannel$AbstractUnsafe (io.netty.channel)access$200:419, AbstractChannel$AbstractUnsafe (io.netty.channel)run:478, AbstractChannel$AbstractUnsafe$1 (io.netty.channel)safeExecute$$$capture:163, AbstractEventExecutor (io.netty.util.concurrent)safeExecute:-1, AbstractEventExecutor (io.netty.util.concurrent) - Async stack traceaddTask:-1, SingleThreadEventExecutor (io.netty.util.concurrent)execute:761, SingleThreadEventExecutor (io.netty.util.concurrent)register:475, AbstractChannel$AbstractUnsafe (io.netty.channel)register:80, SingleThreadEventLoop (io.netty.channel)register:74, SingleThreadEventLoop (io.netty.channel)register:85, MultithreadEventLoopGroup (io.netty.channel)initAndRegister:330, AbstractBootstrap (io.netty.bootstrap)doBind:281, AbstractBootstrap (io.netty.bootstrap)bind:277, AbstractBootstrap (io.netty.bootstrap)bind:252, AbstractBootstrap (io.netty.bootstrap)main:36, Server (com.imooc.netty.ch3)

 

服务端口的绑定

分析这个内容是在哪里调用的需要了解netty 的事件传播机制,但是这个我们以后再来分析,我们现在只需要看看调了什么即可.我们重点看doBind()方法的实现.我们查看NioServerSocketChannel的doBind方法做了什么,就是简单的调用jdk 的方法绑定端口,绑定完成后触发fireChannelActive事件,我们互留事件传播直接看最终会调用哪个函数最终会调用AbstractNioChannel的doBeginRead()方法

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {        ...            boolean wasActive = isActive();            try {//进行端口的绑定操作                doBind(localAddress);            } catch (Throwable t) {                safeSetFailure(promise, t);                closeIfClosed();                return;            }            if (!wasActive && isActive()) {                invokeLater(new Runnable() {                    @Override                    public void run() {//如果绑定成功就触发fireChannelActive事件                        pipeline.fireChannelActive();                    }                });            }          ...        }
protected void doBind(SocketAddress localAddress) throws Exception {        if (PlatformDependent.javaVersion() >= 7) {            javaChannel().bind(localAddress, config.getBacklog());        } else {//直接调用jdk的channel绑定到指定的端口上            javaChannel().socket().bind(localAddress, config.getBacklog());        }    }
protected void doBeginRead() throws Exception {        // Channel.read() or ChannelHandlerContext.read() was called        final SelectionKey selectionKey = this.selectionKey;        if (!selectionKey.isValid()) {            return;        }        readPending = true;        final int interestOps = selectionKey.interestOps();        if ((interestOps & readInterestOp) == 0) {//最终会给我们的常量设置为interestOps | readInterestOp,由构造channel部分我们可知这个感兴趣的事件是accept事件            selectionKey.interestOps(interestOps | readInterestOp);        }    }

总结到此为止netty 的服务端的启动过程我们就讲解完毕了,但是在这上面的分析中我们引申出了在注册selector 时候出现选择器chosser 和nioeventloop 以及在端口绑定中引申出来的事件传播机制,这些都是我们欠下的内容,我们争取在后面的章节一一解答。

转载地址:http://mzelf.baihongyu.com/

你可能感兴趣的文章
【JMeter】1.9上考试jmeter测试调试
查看>>
【虫师】【selenium】参数化
查看>>
【Python练习】文件引用用户名密码登录系统
查看>>
学习网站汇总
查看>>
【Python】用Python打开csv和xml文件
查看>>
【Loadrunner】性能测试报告实战
查看>>
【面试】一份自我介绍模板
查看>>
【自动化测试】自动化测试需要了解的的一些事情。
查看>>
【selenium】selenium ide的安装过程
查看>>
【手机自动化测试】monkey测试
查看>>
【英语】软件开发常用英语词汇
查看>>
Fiddler 抓包工具总结
查看>>
【雅思】雅思需要购买和准备的学习资料
查看>>
【雅思】雅思写作作业(1)
查看>>
【雅思】【大作文】【审题作业】关于同不同意的审题作业(重点)
查看>>
【Loadrunner】通过loadrunner录制时候有事件但是白页无法出来登录页怎么办?
查看>>
【English】【托业】【四六级】写译高频词汇
查看>>
【托业】【新东方全真模拟】01~02-----P5~6
查看>>
【托业】【新东方全真模拟】03~04-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST05~06-----P5~6
查看>>