Netty原理笔记

分类专栏:
Netty相关

文章标签:
Netty
原创

本文转载自渡一教育

Netty原理

一.ByteBuf

NIO中ByteBuffer的缺点:

1)长度固定,无法动态的扩容和缩容,缺乏灵活性
2) 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
3)功能有限,使用过程中往往需要自行封装

1.分类

按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer

1)heap buffer(堆内存缓冲区)

将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输

2)direct buffer(直接内存缓冲区)

不在堆中,而是使用了操作系统的本地内存
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高
缺点:释放和分配的空间更昂贵,使用时需要更谨慎

3)composite buffer(复合内存缓冲区)

将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作
应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
public class TypeTest {

    public static void main(String[] args) {

        ByteBuf buf = Unpooled.copiedBuffer("hello bytebuf", CharsetUtil.UTF_8);
        //堆内存
        if (buf.hasArray()) {
            System.out.println("堆内存缓冲区");
            System.out.println(new String(buf.array()));
        }
        //堆内存缓存区
        ByteBuf buf1 = Unpooled.buffer();
        //直接内存缓存区
        ByteBuf buf2 = Unpooled.directBuffer();
        //复合内存缓存区
        CompositeByteBuf buf3 = Unpooled.compositeBuffer();
        buf3.addComponents(buf1, buf2);
//        buf3.removeComponent(0);
        Iterator<ByteBuf> iterator = buf3.iterator();
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }
}

在这里插入图片描述

4)池化

对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂
Unpooled正是非池化缓冲区的工具类
主要区别在于,池化的内存由netty管理,非池化的内存由GC回收

5)回收方式

回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收
弊端:可能引发内存泄漏
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样检查
public class RefCntTest {

    public static void main(String[] args) {

        ByteBuf buf = Unpooled.buffer(10);
        System.out.println(buf);
        // 引用计数的值
        System.out.println(buf.refCnt());
        // 保持的意思  计数+1
        buf.retain();
        System.out.println(buf.refCnt());
        // 释放的意思  计数-1
        buf.release();
        System.out.println(buf.refCnt());
        buf.release();
        System.out.println(buf);
    }
}

在这里插入图片描述

2.工作原理

和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex

1) 读写分离

在这里插入图片描述 在这里插入图片描述

当执行clear()方法时,索引位置清空回初始位置,但数据保持不变
mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex
public class IndexTest {

    public static void main(String[] args) {

        ByteBuf buf = Unpooled.buffer();
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("--------写入hello index");
        buf.writeBytes("hello index".getBytes());
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("readableBytes: " + buf.readableBytes());
        System.out.println("--------读取hello");

        for (int i = 0; i < 5; i++) {
            System.out.print((char) buf.readByte());
        }

        System.out.println();
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("readableBytes: " + buf.readableBytes());
        System.out.println("--------读取index 并回退");
        buf.markReaderIndex();
//        buf.markWriterIndex();
        int end = buf.writerIndex();

        for (int i = buf.readerIndex(); i < end; i++) {
            System.out.print((char) buf.readByte());
        }

        System.out.println();
        // 撤回到mark标记的位置
        buf.resetReaderIndex();
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("readableBytes: " + buf.readableBytes());
        // 回收可废弃空间
        buf.discardReadBytes();
        System.out.println("--------回收可废弃空间");
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("readableBytes: " + buf.readableBytes());
        buf.clear();
        System.out.println("--------调用clear方法");
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        System.out.println("writableBytes: " + buf.writableBytes());
        System.out.println("readableBytes: " + buf.readableBytes());
    }
}

在这里插入图片描述 在这里插入图片描述

2)深浅拷贝

浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响
深拷贝,拷贝的是整个对象,和原对象之间完全独立
duplicate和slice方法,达成全部浅拷贝和部分浅拷贝
copy,部分深拷贝,部分代表的是可读空间
public class CopyTest {

    public static void main(String[] args) {

        ByteBuf buf = Unpooled.buffer();
        buf.writeBytes("hello bytebuf copy".getBytes());
        System.out.println("----------初始化bytebuf");
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        // 浅复制 / 浅拷贝
        ByteBuf newBuf = buf.duplicate();
        System.out.println("----------duplicate newBuf");
        System.out.println("newBuf capacity: " + newBuf.capacity());
        System.out.println("newBuf readerIndex: " + newBuf.readerIndex());
        System.out.println("newBuf writerIndex: " + newBuf.writerIndex());
        newBuf.writeBytes(" from newBuf".getBytes());
        System.out.println("----------duplicate add data");
        System.out.println("newBuf writerIndex: " + newBuf.writerIndex());
        // 如果读取大小 超过写索引  会报错IndexOutOfBoundsException
        buf.writerIndex(30);

        for (int i = 0; i < 13; i++) {
            System.out.print((char) buf.readByte());
        }

        System.out.println();
        System.out.println("capacity: " + buf.capacity());
        System.out.println("readerIndex: " + buf.readerIndex());
        System.out.println("writerIndex: " + buf.writerIndex());
        // 部分浅拷贝切片的意思
        // 拷贝的区间是readerIndex - writerIndex之间的区域
        // 只可读,不可写
        // 切片的容量就是原可读区域的大小 writerIndex - readerIndex的值
        ByteBuf sliceBuf = buf.slice();
        // 写数据会报错  IndexOutOfBoundsException
//        sliceBuf.writeBytes(" sth".getBytes());
        System.out.println("-----------sliceBuf");
        System.out.println("capacity: " + sliceBuf.capacity());
        System.out.println("readerIndex: " + sliceBuf.readerIndex());
        System.out.println("writerIndex: " + sliceBuf.writerIndex());
        // 深复制
        ByteBuf copyBuf = buf.copy();
        System.out.println("-----------copyBuf");
        System.out.println("capacity: " + copyBuf.capacity());
        System.out.println("readerIndex: " + copyBuf.readerIndex());
        System.out.println("writerIndex: " + copyBuf.writerIndex());
        copyBuf.writeBytes(" from copyBuf".getBytes());
        copyBuf.writerIndex(43);

        for (int i = copyBuf.readerIndex(); i < 43; i++) {
            System.out.print((char) copyBuf.readByte());
        }

        System.out.println();
        System.out.println("-----------原buf");
        buf.writerIndex(43);

        for (int i = buf.readerIndex(); i < 43; i++) {
            System.out.print((char) buf.readByte());
        }
    }
}

在这里插入图片描述 在这里插入图片描述

3.扩容机制

1)ByteBuffer的存储

ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("pz".getBytes());
-----------------------------------------------------------
 //底层原理:
    
 public final ByteBuffer put(byte[] src) {
     return put(src, 0, src.length);
 }
    
    //额外接收偏移量(存储数据的起始位置)和数据长度
    public ByteBuffer put(byte[] src, int offset, int length) {
        //校验参数的有效性
        checkBounds(offset, length, src.length);
        //如果要存储数据的长度 > 剩余可用空间,抛出buffer越界的异常
        if (length > remaining())
            throw new BufferOverflowException();
        //如果剩余空间足够,计算存储的结束位置 = 偏移量 + 数据长度    
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }    
手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够
如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据

2) ByteBuffer的存储和扩容

当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"
public class CapacityTest {

    public static void main(String[] args) {
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("capacity: " + buf.capacity());
        for (int i = 0; i < 11; i++) {
            buf.writeByte(i);
        }
        System.out.println("capacity: " + buf.capacity());
        for (int i = 0; i < 65; i++) {
            buf.writeByte(i);
        }
        System.out.println("capacity: " + buf.capacity());
    }
}

在这里插入图片描述

//查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现   

//ByteBuf类
public abstract ByteBuf writeByte(int value);


//AbstractByteBuf子类

    @Override
    public ByteBuf writeByte(int value) {
        // 确保可写空间足够
        ensureWritable0(1);
        // 写入数据
        _setByte(writerIndex++, value);
        return this;
    }
    
    // 参数为 最小写入数据的大小
    final void ensureWritable0(int minWritableBytes) {
        final int writerIndex = writerIndex();
        // 目标容量 = 当前写操作索引 + 最小写入数据大小
        final int targetCapacity = writerIndex + minWritableBytes;
        // 容量足够  不需扩容
        if (targetCapacity <= capacity()) {
            ensureAccessible();
            return;
        }
        // 容量不足时 如果目标容量 超出最大容量  抛出异常
        if (checkBounds && targetCapacity > maxCapacity) {
            ensureAccessible();
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
		
		// 扩容逻辑
        // 获取可写空间大小
        final int fastWritable = maxFastWritableBytes();
        // 如果 可写空间 >= 所需空间   新的容量=写操作索引+可写空间大小 
        // 如果 可写空间 < 所需空间   计算要扩容的新容量大小 calculateNewCapacity方法
        int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
                : alloc().calculateNewCapacity(targetCapacity, maxCapacity);
        
        // Adjust to the new capacity.
        
        // 计算完成后 生成新的ByteBuffer
        capacity(newCapacity);
    }
    
    // 获取可写空间大小
    public int maxFastWritableBytes() {
        return writableBytes();
    }
    

//AbstractByteBufAllocator子类

    // 计算要扩容的新容量大小
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // 校验参数有效性
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        // 扩容方式的分界点 以4M大小为界
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.、
        // 如果所需容量大于4M  按照步进的方式扩容 
        //   举例: 比如 minNewCapacity = 5M  
        if (minNewCapacity > threshold) {
            // newCapacity = 5 / 4 * 4 = 4M  确保是4的倍数
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                // newCapacity = 4 + 4 = 8M;
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        // 如果所需容量大于4M  按照64的倍数扩容  找到最接近所需容量的64的倍数
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }
        
        // 保障在最大可接受容量范围内
        return Math.min(newCapacity, maxCapacity);
    }

3)优势

池化的方式提高内存使用率
提出了复合型缓冲区的整合方案
增加了索引,使读写分离,使用更便捷
解决了ByteBuffer长度固定的问题,增加了扩容机制
用引用计数的方式进行对象回收

二.channel

channel是通讯的载体,对应通讯的一端,在BIO中对应Socket,NIO中对应SocketChannel,Netty中对应NioSocketChannel,ServerSocket同理
channelhandler是通道的处理器,一个channel往往有多个handler
channelpipeline是handler的容器,装载并管理handler的顺序(本质是双向链表)

在这里插入图片描述

如图,channel创建时,会对应创建一个channelpipeline,pipeline首先会记录一个头部的处理器handler,当pipeline进行分发时,先分发给头部,然后依次执行,执行handler全部执行完成
同时,channel创建后,会注册进EventLoop之中,EventLoop会监听事件的发生。不同的事件调用handler不同的处理方法,让流程运转起来。

channel生命周期,对应四种状态,分别为:

1) ChannelUnregistered 已创建但还未被注册到监听器中
2) ChannelRegistered 已注册到监听器EventLoop中
3) ChannelActive 连接完成处于活跃状态,此时可以接收和发送数据
4) ChannelInactive 非活跃状态,代表连接未建立或者已断开

channelhandler生命周期,对应三种状态,分别为:

1) handlerAdded 把handler添加到pipeline之中
2) handlerRemoved 从pipeline中移除
3) exceptionCaught 在处理过程中有错误产生

创建channel源码分析

//以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();

//参数设置
serverBootstrap.channel(NioServerSocketChannel.class)

//AbstractBootstrap 启动对象的父类


    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .......
    }
    
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } 
        .......
    }
   
   
 //ReflectiveChannelFactory  工厂实现类
 
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

在启动对象调用bind()或connect()方法时,会创建channel
本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过channel参数来设置的

ChannelHandler

类层次关系图

在这里插入图片描述

入站和出站:

从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念
从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站
不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行
责任链模式——当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递
适配器模式——出国时要使用的电源转换器(美国/日本110V 中国220V电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口

ChannelDuplexHandler是除了入站和出站handler之外的,另一个常用子类

它同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,如果需要既处理入站事件又处理出站事件,可以继承此类
 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
 
 ------------------------------------------------------------------
 public class LoggingHandler extends ChannelDuplexHandler{}
 
 ------------------------------------------------------------------
 public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
 
  ------------------------------------------------------------------
  public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}

ChannelHandlerAdapter

提供了额外的isSharable()方法,用来判断handler是否可以被共享到多个pipeline之中
默认情况不共享,如果需要共享,在继承了适配器的handler上,增加注解@Sharable
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}

ChannelInboundHandler

最重要的方法是channelRead(),在使用时,需要显式的释放ByteBuf相关的内存。使用ReferenceCountUtil是引用计数的工具类
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // netty中的缓冲区叫做ByteBuf -- 对ByteBuffer的封装
        ByteBuf buf = (ByteBuf) msg;
        // 释放ByteBuf内存
        ReferenceCountUtil.release(msg);
    }
为了减少对资源内存的管理,使用SimpleChannelInboundHandler,使用其channelRead0()方法,可以自动释放资源,使用更便利
//SimpleChannelInboundHandler源码

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

ChannelPipeline

pipeline中维护入站和出站链路,两条链路的执行顺序
handler只负责处理自身的业务逻辑,对通道而言,它是无状态的
通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色
pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中
结点叫做HeadContext,尾节点叫做TailContext

在这里插入图片描述

 ch.pipeline().addLast(new NettyServerHandler());
 
 //DefaultChannelPipeline
 ----------------------------------------------------------------
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
    
    // 关键逻辑
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
            checkMultiplicity(handler);
		    // 将handler封装为context
            newCtx = newContext(group, filterName(name, handler), handler);
            // 将context添加到链表尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 判断当前通道的注册状态,如果是未注册,执行此逻辑
            if (!registered) {
                // 添加一个任务,当通道被注册后,能够回调handlerAdded方法
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 如果已被注册  执行调用handlerAdded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    
    
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    // 尾节点会提前声明并创建
    final AbstractChannelHandlerContext tail;
    
    //  prev -> tail   在其中插入newctx
    //  prev -> newctx -> tail   放到倒数第二个位置中  tail节点是保持不变的
    //  依次更改 新节点的前后指针   以及prev节点的后置指针和tail节点的前置指针
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    // 构造器中已经提前创建了头尾节点
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

使用pipeline模式的优点:

1) 解耦,让处理器逻辑独立,可以被多个channel共享
2) channel相关信息,交给context维护
3) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序

三.EventLoop

EventLoop事件循环,监听IO事件,内部封装了线程
EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池
当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件
一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理
同时,多个通道可以由一个EventLoop处理,是多对一的关系

EventLoopGroup

类层级结构
(通过选中NioEventLoopGroup源码 - 右键 - 选中Diagrams - 选中show diagram 展示出来)

在这里插入图片描述

初始化流程

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();


//NioEventLoopGroup
-------------------------------------------------------------------------
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    // 逐步增加参数
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    // 调用父类的构造器
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    
 //MultithreadEventLoopGroup
------------------------------------------------------------------------- 
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    // 初始化线程数量的逻辑     线程数 = cpu核数 * 2
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    
//MultithreadEventExecutorGroup    
------------------------------------------------------------------------- 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    // 核心逻辑
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 根据线程数量  创建EventLoop的逻辑  
                // newChild()的具体实现在NioEventLoopGroup类中
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    
//NioEventLoopGroup    
-------------------------------------------------------------------------     
     @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }   
    
    

EventLoop

重要属性为 Selector 及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程

在这里插入图片描述

注册channel

 ChannelFuture future = serverBootstrap.bind(8888).sync();
 
 //AbstractBootstrap
 --------------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .....
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }  

注册的源码调用链路

ChannelFuture regFuture = config().group().register(channel);

//MultithreadEventLoopGroup
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
//SingleThreadEventLoop    
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
  
 
//AbstractChannel
------------------------------------------------------------------------------
//  核心逻辑是 调用了 register0()方法
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
				
			// channel中存在EventLoop类型的属性
            // 通道初始化时,会将指定的EventLoop与channel进行关联
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
        
        
     // 核心逻辑是调用了doRegister()方法
     private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }        
  

//AbstractNioChannel
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将通道注册进Selector中 此时感兴趣的事件是0
                // 并且将当前对象作为附加对象存入其中  等价selectionKey.attach()方法
                // 使用对象时   再通过selectionkey.attachment()方法取出对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    

轮询事件的状态

在这里插入图片描述

AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口
//SingleThreadEventExecutor
-----------------------------------------------------------------------------
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private final Queue<Runnable> taskQueue;
    //  当前类维护了一个任务队列
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 启动线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 真正的调用逻辑
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
                
                ......
            }
         }
     }

//NioEventLoop
-----------------------------------------------------------------------------
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT: 
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 判断队列中是否存在任务
                            if (!hasTasks()) { 
                                // 如果不存在  调用select()进行获取
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            // 处理任务的核心逻辑
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用

    // 处理事件集合
    private void processSelectedKeys() {
        // 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            // 这是注册时,存储的附加对象,即为通道对象channel
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                // 处理具体事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    // 处理具体事件
    //  判断事件类型,调用对应的逻辑处理
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

其中读事件的处理

unsafe.read();

//AbstractNioByteChannel
----------------------------------------------------------------------------
public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

Bootstrap

引导,对应用程序进行配置,并让他运行起来的过程

1)配置

必选参数:group 、 channel、 handler(服务端 -- childHandler)
group(): 指定一个到两个reactor
channel():指定channel工厂,反射的方式创建channel使用
handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定
option():指定TCP相关的参数,以及netty自定义的参数
配置参数的过程,称之为初始化

2)运行

        // 启动并设置端口号  但需要使用sync异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            // 将关闭通道的方式 也设置为异步的
            //    阻塞finally中的代码执行
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来
sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果
future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法

3)源码

类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
init方法
工作内容
设置channel相关的选项参数
设置channel的属性键值对
添加对channel的IO事件处理器 (Acceptor角色)
void init(Channel channel) {
        // 设置channel相关的选项参数
        setChannelOptions(channel, newOptionsArray(), logger);
        // 设置channel的属性键值对
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                
                // 添加对channel的IO事件处理器 (Acceptor角色)
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

Acceptor分析

功能:将主reactor接收到的客户端通道,传递给从reactor
// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 此时msg对应 服务端接收到的客户端通道
            final Channel child = (Channel) msg;
		    // 设置处理链路
            child.pipeline().addLast(childHandler);
			// 设置通道的配置项和参数
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                // childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {					
                        // 增加监听 获取注册的异步结果
                        // 如果注册失败 或者抛出异常  都会关闭channel
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        
        
        // 如果处理客户端连接失败了,暂停一秒,然后继续接受
        // 为保障服务端能够尽可能多的处理客户端的连接  不受某一次处理失败的结果影响
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            ctx.fireExceptionCaught(cause);
        }

Future和Promise

Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等
对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次
参照生活中的定额发票(Future)和机打发票(Promise)
  • 作者:潘震
  • 评论

    留言