public class TypeTest {
public static void main(String[] args) {
ByteBuf buf = Unpooled.copiedBuffer("hello bytebuf", CharsetUtil.UTF_8);
//堆内存
if (buf.hasArray()) {
System.out.println("堆内存缓冲区");
System.out.println(new String(buf.array()));
}
//堆内存缓存区
ByteBuf buf1 = Unpooled.buffer();
//直接内存缓存区
ByteBuf buf2 = Unpooled.directBuffer();
//复合内存缓存区
CompositeByteBuf buf3 = Unpooled.compositeBuffer();
buf3.addComponents(buf1, buf2);
// buf3.removeComponent(0);
Iterator<ByteBuf> iterator = buf3.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}
public class RefCntTest {
public static void main(String[] args) {
ByteBuf buf = Unpooled.buffer(10);
System.out.println(buf);
// 引用计数的值
System.out.println(buf.refCnt());
// 保持的意思 计数+1
buf.retain();
System.out.println(buf.refCnt());
// 释放的意思 计数-1
buf.release();
System.out.println(buf.refCnt());
buf.release();
System.out.println(buf);
}
}
public class IndexTest {
public static void main(String[] args) {
ByteBuf buf = Unpooled.buffer();
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("--------写入hello index");
buf.writeBytes("hello index".getBytes());
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("readableBytes: " + buf.readableBytes());
System.out.println("--------读取hello");
for (int i = 0; i < 5; i++) {
System.out.print((char) buf.readByte());
}
System.out.println();
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("readableBytes: " + buf.readableBytes());
System.out.println("--------读取index 并回退");
buf.markReaderIndex();
// buf.markWriterIndex();
int end = buf.writerIndex();
for (int i = buf.readerIndex(); i < end; i++) {
System.out.print((char) buf.readByte());
}
System.out.println();
// 撤回到mark标记的位置
buf.resetReaderIndex();
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("readableBytes: " + buf.readableBytes());
// 回收可废弃空间
buf.discardReadBytes();
System.out.println("--------回收可废弃空间");
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("readableBytes: " + buf.readableBytes());
buf.clear();
System.out.println("--------调用clear方法");
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("writableBytes: " + buf.writableBytes());
System.out.println("readableBytes: " + buf.readableBytes());
}
}
public class CopyTest {
public static void main(String[] args) {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes("hello bytebuf copy".getBytes());
System.out.println("----------初始化bytebuf");
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
// 浅复制 / 浅拷贝
ByteBuf newBuf = buf.duplicate();
System.out.println("----------duplicate newBuf");
System.out.println("newBuf capacity: " + newBuf.capacity());
System.out.println("newBuf readerIndex: " + newBuf.readerIndex());
System.out.println("newBuf writerIndex: " + newBuf.writerIndex());
newBuf.writeBytes(" from newBuf".getBytes());
System.out.println("----------duplicate add data");
System.out.println("newBuf writerIndex: " + newBuf.writerIndex());
// 如果读取大小 超过写索引 会报错IndexOutOfBoundsException
buf.writerIndex(30);
for (int i = 0; i < 13; i++) {
System.out.print((char) buf.readByte());
}
System.out.println();
System.out.println("capacity: " + buf.capacity());
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("writerIndex: " + buf.writerIndex());
// 部分浅拷贝切片的意思
// 拷贝的区间是readerIndex - writerIndex之间的区域
// 只可读,不可写
// 切片的容量就是原可读区域的大小 writerIndex - readerIndex的值
ByteBuf sliceBuf = buf.slice();
// 写数据会报错 IndexOutOfBoundsException
// sliceBuf.writeBytes(" sth".getBytes());
System.out.println("-----------sliceBuf");
System.out.println("capacity: " + sliceBuf.capacity());
System.out.println("readerIndex: " + sliceBuf.readerIndex());
System.out.println("writerIndex: " + sliceBuf.writerIndex());
// 深复制
ByteBuf copyBuf = buf.copy();
System.out.println("-----------copyBuf");
System.out.println("capacity: " + copyBuf.capacity());
System.out.println("readerIndex: " + copyBuf.readerIndex());
System.out.println("writerIndex: " + copyBuf.writerIndex());
copyBuf.writeBytes(" from copyBuf".getBytes());
copyBuf.writerIndex(43);
for (int i = copyBuf.readerIndex(); i < 43; i++) {
System.out.print((char) copyBuf.readByte());
}
System.out.println();
System.out.println("-----------原buf");
buf.writerIndex(43);
for (int i = buf.readerIndex(); i < 43; i++) {
System.out.print((char) buf.readByte());
}
}
}
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("pz".getBytes());
-----------------------------------------------------------
//底层原理:
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
//额外接收偏移量(存储数据的起始位置)和数据长度
public ByteBuffer put(byte[] src, int offset, int length) {
//校验参数的有效性
checkBounds(offset, length, src.length);
//如果要存储数据的长度 > 剩余可用空间,抛出buffer越界的异常
if (length > remaining())
throw new BufferOverflowException();
//如果剩余空间足够,计算存储的结束位置 = 偏移量 + 数据长度
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
public class CapacityTest {
public static void main(String[] args) {
ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
buf.writeByte(i);
}
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 65; i++) {
buf.writeByte(i);
}
System.out.println("capacity: " + buf.capacity());
}
}
//查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现
//ByteBuf类
public abstract ByteBuf writeByte(int value);
//AbstractByteBuf子类
@Override
public ByteBuf writeByte(int value) {
// 确保可写空间足够
ensureWritable0(1);
// 写入数据
_setByte(writerIndex++, value);
return this;
}
// 参数为 最小写入数据的大小
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
// 目标容量 = 当前写操作索引 + 最小写入数据大小
final int targetCapacity = writerIndex + minWritableBytes;
// 容量足够 不需扩容
if (targetCapacity <= capacity()) {
ensureAccessible();
return;
}
// 容量不足时 如果目标容量 超出最大容量 抛出异常
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 扩容逻辑
// 获取可写空间大小
final int fastWritable = maxFastWritableBytes();
// 如果 可写空间 >= 所需空间 新的容量=写操作索引+可写空间大小
// 如果 可写空间 < 所需空间 计算要扩容的新容量大小 calculateNewCapacity方法
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// Adjust to the new capacity.
// 计算完成后 生成新的ByteBuffer
capacity(newCapacity);
}
// 获取可写空间大小
public int maxFastWritableBytes() {
return writableBytes();
}
//AbstractByteBufAllocator子类
// 计算要扩容的新容量大小
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 校验参数有效性
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 扩容方式的分界点 以4M大小为界
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.、
// 如果所需容量大于4M 按照步进的方式扩容
// 举例: 比如 minNewCapacity = 5M
if (minNewCapacity > threshold) {
// newCapacity = 5 / 4 * 4 = 4M 确保是4的倍数
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
// newCapacity = 4 + 4 = 8M;
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// 如果所需容量大于4M 按照64的倍数扩容 找到最接近所需容量的64的倍数
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
// 保障在最大可接受容量范围内
return Math.min(newCapacity, maxCapacity);
}
//以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();
//参数设置
serverBootstrap.channel(NioServerSocketChannel.class)
//AbstractBootstrap 启动对象的父类
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.......
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
.......
}
//ReflectiveChannelFactory 工厂实现类
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
------------------------------------------------------------------
public class LoggingHandler extends ChannelDuplexHandler{}
------------------------------------------------------------------
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
------------------------------------------------------------------
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// netty中的缓冲区叫做ByteBuf -- 对ByteBuffer的封装
ByteBuf buf = (ByteBuf) msg;
// 释放ByteBuf内存
ReferenceCountUtil.release(msg);
}
//SimpleChannelInboundHandler源码
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
ch.pipeline().addLast(new NettyServerHandler());
//DefaultChannelPipeline
----------------------------------------------------------------
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
// 关键逻辑
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
checkMultiplicity(handler);
// 将handler封装为context
newCtx = newContext(group, filterName(name, handler), handler);
// 将context添加到链表尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 判断当前通道的注册状态,如果是未注册,执行此逻辑
if (!registered) {
// 添加一个任务,当通道被注册后,能够回调handlerAdded方法
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 如果已被注册 执行调用handlerAdded方法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 尾节点会提前声明并创建
final AbstractChannelHandlerContext tail;
// prev -> tail 在其中插入newctx
// prev -> newctx -> tail 放到倒数第二个位置中 tail节点是保持不变的
// 依次更改 新节点的前后指针 以及prev节点的后置指针和tail节点的前置指针
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 构造器中已经提前创建了头尾节点
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//NioEventLoopGroup
-------------------------------------------------------------------------
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 逐步增加参数
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// 调用父类的构造器
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
//MultithreadEventLoopGroup
-------------------------------------------------------------------------
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 初始化线程数量的逻辑 线程数 = cpu核数 * 2
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
//MultithreadEventExecutorGroup
-------------------------------------------------------------------------
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// 核心逻辑
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 根据线程数量 创建EventLoop的逻辑
// newChild()的具体实现在NioEventLoopGroup类中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
//NioEventLoopGroup
-------------------------------------------------------------------------
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
ChannelFuture future = serverBootstrap.bind(8888).sync();
//AbstractBootstrap
--------------------------------------------------------------------------
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
ChannelFuture regFuture = config().group().register(channel);
//MultithreadEventLoopGroup
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
//SingleThreadEventLoop
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
//AbstractChannel
------------------------------------------------------------------------------
// 核心逻辑是 调用了 register0()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// channel中存在EventLoop类型的属性
// 通道初始化时,会将指定的EventLoop与channel进行关联
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 核心逻辑是调用了doRegister()方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
//AbstractNioChannel
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将通道注册进Selector中 此时感兴趣的事件是0
// 并且将当前对象作为附加对象存入其中 等价selectionKey.attach()方法
// 使用对象时 再通过selectionkey.attachment()方法取出对象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
//SingleThreadEventExecutor
-----------------------------------------------------------------------------
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private final Queue<Runnable> taskQueue;
// 当前类维护了一个任务队列
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 真正的调用逻辑
SingleThreadEventExecutor.this.run();
success = true;
}
......
}
}
}
//NioEventLoop
-----------------------------------------------------------------------------
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 判断队列中是否存在任务
if (!hasTasks()) {
// 如果不存在 调用select()进行获取
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
// 处理任务的核心逻辑
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// 处理事件集合
private void processSelectedKeys() {
// 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
// 这是注册时,存储的附加对象,即为通道对象channel
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
// 处理具体事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
// 处理具体事件
// 判断事件类型,调用对应的逻辑处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
unsafe.read();
//AbstractNioByteChannel
----------------------------------------------------------------------------
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
// 启动并设置端口号 但需要使用sync异步启动
try {
ChannelFuture future = serverBootstrap.bind(8888).sync();
// 将关闭通道的方式 也设置为异步的
// 阻塞finally中的代码执行
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
void init(Channel channel) {
// 设置channel相关的选项参数
setChannelOptions(channel, newOptionsArray(), logger);
// 设置channel的属性键值对
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加对channel的IO事件处理器 (Acceptor角色)
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 此时msg对应 服务端接收到的客户端通道
final Channel child = (Channel) msg;
// 设置处理链路
child.pipeline().addLast(childHandler);
// 设置通道的配置项和参数
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 增加监听 获取注册的异步结果
// 如果注册失败 或者抛出异常 都会关闭channel
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// 如果处理客户端连接失败了,暂停一秒,然后继续接受
// 为保障服务端能够尽可能多的处理客户端的连接 不受某一次处理失败的结果影响
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
评论