Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明
作者: Grey
原文地址:
博客园:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明
CSDN:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明
说明
在 Netty 服务端代码中,我们一般会创建了两个 NioEventLoopGroup:bossGroup 和 workerGroup
其中: bossGroup
用于监听端口,接收新连接的线程组;workerGroup
用于处理每一个连接的数据读写的线程组。
bossGroup 创建第一个 NioEventLoop 线程
NioEventLoop 的启动入口在AbstractUnsafe
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
其中inEventLoop()
方法调用的是AbstractEventExecutor
的实现
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
而这个实现又调用了子类SingleThreadEventExecutor
的如下方法
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
在服务端刚启动的时候,Thread.currentThread()
就是当前 main 方法对应的主线程,而this.thread
还没有开始赋值,所以此时为null,
所以eventLoop.inEventLoop()
在一开始调用的时候,返回的是 false,进入AbstractUnsafe
的如下else
逻辑中
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... AbstractChannel.this.eventLoop = eventLoop; // 首次执行的时候 eventLoop.inEventLoop() 返回 false,执行 else 逻辑 if (eventLoop.inEventLoop()) { ...... } else { ...... eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ...... } }
其中executor
方法对应的是SingleThreadEventExecutor
的execute
方法
private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { ...... } } if (!addTaskWakesUp && immediate) { ...... } }
inEventLoop()
经过上述分析,为false
,所以执行startThread()
方法
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
这里主要的逻辑就是判断线程是否启动,如果没有启动,就调用doStartThread()
启动。doStartThread()
的逻辑是
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); ... SingleThreadEventExecutor.this.run(); ...... } }); }
通过一个成员变量thread
来保存ThreadPerTaskExecutor
创建出来的线程(即:FastThreadLocalThread),NioEventLoop 保存完线程的引用之后,随即调用 run 方法。
workGroup 对应的 NioEventLoop 创建线程和启动
workGroup 对应的 NioEventLoop 创建的线程主要做如下事情
-
执行一次事件轮询。首先轮询注册到 Reactor 线程对应的 Selector 上的所有 Channel 的 IO 事件。
-
处理产生 IO 事件的 Channel。如果有读写或者新连接接入事件,则处理:
-
处理任务队列。
以上三个步骤分别对应了下述三个方法
事件轮询
事件轮询调用了NioEventLoop
的如下方法
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
处理 IO 事件的 Channel
调用的是NioEventLoop
的如下方法
private void processSelectedKeys() { if (selectedKeys != null) { // 处理优化过的 SelectedKeys processSelectedKeysOptimized(); } else { // 处理正常的 SelectedKeys processSelectedKeysPlain(selector.selectedKeys()); } }
上述两个分支分别处理了不同类型的 key:重点关注优化过的 SelectedKeys,selectedKeys 在 NioEventLoop 中是一个SelectedSelectionKeySet
对象,这个对象虽然叫Set
,但是底层使用了数组
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } ...... }
add 方法的主要流程是:
-
将SelectionKey塞到该数组的尾部;
-
更新该数组的逻辑长度+1;
-
如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容。
待程序运行一段时间后,等数组的长度足够长,每次在轮询到 NIO 事件的时候,Netty 只需要O(1)
的时间复杂度就能将SelectionKey
塞到set
中去,而 JDK 底层使用的HashSet,put的时间复杂度最少是O(1),最差是O(n)。
进入processSelectedKeysOptimized
方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
主要是三个步骤:
第一步,取出 IO 事件及对应的 Channel。其中selectedKeys[i] = null;
的目的是防止内存泄漏
第二步,处理 Channel
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
Netty 的轮询注册机制其实是将 AbstractNioChannel 内部的 JDK 类 SelectableChannel 对象注册到 JDK 类 Selector 对象上,并且将 AbstractNioChannel 作为SelectableChannel 对象的一个 attachment 附属上,这样在 JDK 轮询出某条 SelectableChannel 有 IO 事件发生时,就可以直接取出 AbstractNioChannel 进行后续操作。
在Netty的Channel中,有两大类型的Channel,
一个是NioServerSocketChannel,由boss NioEventLoopGroup负责处理;
一个是NioSocketChannel,由worker NioEventLoop负责处理,
所以:
(1)对于boss NioEventLoop来说,轮询到的是连接事件,后续通过NioServerSocketChannel的Pipeline将连接交给一个worker NioEventLoop处理;
(2)对于worker NioEventLoop来说,轮询到的是读写事件,后续通过NioSocketChannel的Pipeline将读取到的数据传递给每个ChannelHandler来处理。
第三步,判断是否需要再一次轮询
由needsToSelectAgain
变量控制,needsToSelectAgain
变量在如下方法中被调用,在NioEventLoop
中
private static final int CLEANUP_INTERVAL = 256; void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } }
cancel方法是用于将key取消,并且在被取消的key到达CLEANUP_INTERVAL
的时候,设置needsToSelectAgain
为 true,CLEANUP_INTERVAL
默认值为256。
也就是说,对于每个NioEventLoop而言,每隔256个Channel从Selector上移除的时候,就标记needsToSelectAgain为true,然后将SelectedKeys的内部数组全部清空,方便JVM垃圾回收,然后调用selectAgain重新填装SelectionKeys数组。
处理任务队列
调用的是如下方法
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = getCurrentTimeNanos(); } afterRunningAllTasks(); return ranAtLeastOne; }
一路追踪下去,进入SingleThreadEventExecutor
的 offerTask()
方法
final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
Netty 内部使用一个 taskQueue 将Task保存起来。这个 taskQueue 其实是一个 MPSC Queue,每一个 NioEventLoop 都与它一一对应。
接下来执行SingleThreadEventExecutor
的 runAllTasks()
方法
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = getCurrentTimeNanos(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = getCurrentTimeNanos(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
主要流程如下:
1.NioEventLoop在执行过程中不断检测是否有事件发生,如果有事件发生就处理,处理完事件之后再处理外部线程提交过来的异步任务。
2.在检测是否有事件发生的时候,为了保证异步任务的及时处理,只要有任务要处理,就立即停止事件检测,随即处理任务。
3.外部线程异步执行的任务分为两种:定时任务和普通任务,分别落地到 MpscQueue 和 PriorityQueue ,而 PriorityQueue 中的任务最终都会填充到 MpscQueue 中处理。
4.Netty每隔64个任务检查一次是否该退出任务循环。
完整代码见:hello-netty
本文所有图例见:processon: Netty学习笔记
更多内容见:Netty专栏