本文共 15331 字,大约阅读时间需要 51 分钟。
最近一段时间在学习netty,但是不知道原理用起来总是觉得很懵逼,现在一系列的文章都是在大咖的讲解下结合我自己的理解总结出的一系列分析。汇总下来,有两个目的,一个目的是方便自己日后回顾快速复习,一是希望能对其他小伙伴产生帮助,如果是这样的话,也是级感欣慰,话不多说我们步入正题。
今天这一话的基本目录如下:
目录
我们建立在一个基本服务端代码上做分析,代码如下:
public final class Server { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") .handler(new ServerHandler()) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AuthHandler()); //.. } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
服务端channel的创建的基本过程,我们从ServerBootstrap的bind方法开始分析,一直往下走直到doBind方法,我们跟进initAndRegister,此处通过channelFactory对象创建channel,现在我们知道是谁以及在哪里创建这个channel了,就转化为了这个工厂是如何创建channel的问题了,通过断点我们知道这个工厂的类为ReflectiveChannelFactory,由于该工厂实现简单,我们全部粘贴出来我们看到实际创建对象就是直接使用反射class.newInstance创建对象的,那么我们现在的问题就转化为了这个class是什么以及何时实例化该工厂并传递clazz的,这个工厂是在哪里创建的并且clazz是什么类型的呢?我们知道创建的channel为NioServerSocketChannel.class,而且我们是在ServerBootstrap.channel(NioServerSocketChannel.class)的时候传递的,那我们进去看看这个方法,我们看到这个方法实例化了我们需要的ChannelFactory,并将NioServerSocketChannel作为参数传递进去了,而channelFactory()方法则是直接将我们实例化的工厂直接赋值给了成员变量.。现在我们知道了channel是在哪里以及如何创建的了,那我们现在进入NioServerSocketChannel的构造函数中去看看channel,我们重点看一下newSocket方法做了什么操作,我们看到下面的代码,创建了一个jdk的channel,到此为止我们channel的创建过程就算是介绍完了。
final ChannelFuture initAndRegister() { Channel channel = null; try { //此处通过channelFactory对象创建channel,现在我们知道是谁以及在哪里创建这个channel了,就转化为了这个工厂是如何创建channel的问题了 channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { .... } return regFuture; }
//由于该工厂实现简单,我们全部粘贴出来我们看到实际创建对象就是直接使用反射class.newInstance创建对象的,那么我们现在的问题就转化为了这个class是什么以及何时实例化该工厂并传递clazz的public class ReflectiveChannelFactoryimplements ChannelFactory { private final Class clazz; public ReflectiveChannelFactory(Class clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; }}
public B channel(Class channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } //我们看到这个方法实例化了我们需要的ChannelFactory,并将NioServerSocketChannel作为参数传递进去了 return channelFactory(new ReflectiveChannelFactory(channelClass)); } public B channelFactory(ChannelFactory channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } //此处我们将他直接复制给了上文我们用到的channelFactory this.channelFactory = channelFactory; return (B) this; }
public NioServerSocketChannel() { //我们重点看一下newSocket方法做了什么操作,我们看到下面的代码,创建了一个jdk的ServerSocketChannel this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See #2308. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //这里我们将jdk本地的channel保存起来 this.ch = ch; //OP_ACCEPT 保存到readInterestOp this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
我们分析翻channel的创建之后再接着看channel的初始化操作,这部分基本没有难点,但是会有一段代码现在咱们暂时还看不懂,等以后我们在回过头来分析,我们现在只是先简单的粘贴出主代码来。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); //此处进行channel的初始化操作 init(channel); } catch (Throwable t) { ... } ... return regFuture; }
void init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } final Map , Object> attrs = attrs0(); synchronized (attrs) { for (Entry , Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
首先 调用nioeventloopgroup的register方法进行channel的selector 注册步骤,实际上调用的是MultithreadEventLoopGroup的register方法,这里有两处需要我们进行分析的,一个是next方法的返回值,一个是register做的操作,我们one by one 先从next 进行分析,在next方法中,我们看到最终会调用一个选择器来返回一个EventExecutor 对象,那这个EventExecutor 具体是什么呢,chooser又是怎样获取的呢?这个我们先留一个小尾巴,在我们学习了NioEventLoop之后我们继续分析,我们只需知道他现在返回的是一个NioEventLoop类型的实例。那么我们现在再来分析NioEventLoop.register()方法。实际上调用的是SingleThreadEventLoop类的register方法,进而调用SingleThreadEventLoop类的register方法,最后通过AbstractChannel类,调用 AbstractNioChannel类的doRegister进行channel的注册。如果看着流程有些模糊的话可以看一下下图的堆栈信息或自己动手调试即可。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { ... } //调用nioeventloopgroup的register方法进行channel的selector 注册步骤 ChannelFuture regFuture = config().group().register(channel); ... return regFuture; }
//MultithreadEventLoopGroup的register方法,这里有两处需要我们进行分析的,一个是next方法的返回值,一个是register做的操作,我们one by one 先从next 进行分析public ChannelFuture register(Channel channel) { return next().register(channel); }
public EventExecutor next() {//我们看到最终会调用一个选择器来返回一个EventExecutor 对象,那这个EventExecutor 具体是什么呢,chooseer又是怎样获取的呢?这个我们先留一个小尾巴,在我们学习了NioEventLoop之后我们继续分析,我们只需知道他现在返回的是一个NioEventLoop类型的实例 return chooser.next(); }
//调用到SingleThreadEventLoop类的register方法public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
// SingleThreadEventLoop类的register方法 public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //我们重点分析这个方法 promise.channel().unsafe().register(this, promise); return promise; }
//AbstractChannel类public final void register(EventLoop eventLoop, final ChannelPromise promise) { //首先将eventLoop赋值给成员变量 AbstractChannel.this.eventLoop = eventLoop; //判断是否在同一个线程中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ... } } }//最后调用doRegister方法进行注册private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //发出相关事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);发出相关事件 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
//调用 AbstractNioChannel类的doRegister进行channel的注册protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
doRegister:384, AbstractNioChannel (io.netty.channel.nio)register0:500, AbstractChannel$AbstractUnsafe (io.netty.channel)access$200:419, AbstractChannel$AbstractUnsafe (io.netty.channel)run:478, AbstractChannel$AbstractUnsafe$1 (io.netty.channel)safeExecute$$$capture:163, AbstractEventExecutor (io.netty.util.concurrent)safeExecute:-1, AbstractEventExecutor (io.netty.util.concurrent) - Async stack traceaddTask:-1, SingleThreadEventExecutor (io.netty.util.concurrent)execute:761, SingleThreadEventExecutor (io.netty.util.concurrent)register:475, AbstractChannel$AbstractUnsafe (io.netty.channel)register:80, SingleThreadEventLoop (io.netty.channel)register:74, SingleThreadEventLoop (io.netty.channel)register:85, MultithreadEventLoopGroup (io.netty.channel)initAndRegister:330, AbstractBootstrap (io.netty.bootstrap)doBind:281, AbstractBootstrap (io.netty.bootstrap)bind:277, AbstractBootstrap (io.netty.bootstrap)bind:252, AbstractBootstrap (io.netty.bootstrap)main:36, Server (com.imooc.netty.ch3)
分析这个内容是在哪里调用的需要了解netty 的事件传播机制,但是这个我们以后再来分析,我们现在只需要看看调了什么即可.我们重点看doBind()方法的实现.我们查看NioServerSocketChannel的doBind方法做了什么,就是简单的调用jdk 的方法绑定端口,绑定完成后触发fireChannelActive事件,我们互留事件传播直接看最终会调用哪个函数最终会调用AbstractNioChannel的doBeginRead()方法
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try {//进行端口的绑定操作 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() {//如果绑定成功就触发fireChannelActive事件 pipeline.fireChannelActive(); } }); } ... }
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else {//直接调用jdk的channel绑定到指定的端口上 javaChannel().socket().bind(localAddress, config.getBacklog()); } }
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) {//最终会给我们的常量设置为interestOps | readInterestOp,由构造channel部分我们可知这个感兴趣的事件是accept事件 selectionKey.interestOps(interestOps | readInterestOp); } }
总结到此为止netty 的服务端的启动过程我们就讲解完毕了,但是在这上面的分析中我们引申出了在注册selector 时候出现选择器chosser 和nioeventloop 以及在端口绑定中引申出来的事件传播机制,这些都是我们欠下的内容,我们争取在后面的章节一一解答。
转载地址:http://mzelf.baihongyu.com/