Netty 学习(五):服务端启动核心流程源码说明
作者: Grey
原文地址:
CSDN:Netty 学习(五):服务端启动核心流程源码说明
说明
本文使用的 Netty 版本是 4.1.82.Final,
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.82.Final</version> </dependency>
服务端在启动的时候,主要流程有如下几个
-
创建服务端的 Channel
-
初始化服务端的 Channel
-
注册 Selector
-
端口绑定
我们可以写一个简单的服务端代码,通过 Debug 的方式查看这几个关键流程的核心代码。
package source; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 代码阅读 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since */ public final class SimpleServer { public static void main(String[] args) throws InterruptedException { // EventLoopGroup: 服务端的线程模型外观类。这个线程要做的事情 // 就是不停地检测IO事件,处理IO事件,执行任务。 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务端的一个启动辅助类。通过给它设置一系列参数来绑定端口启动服务。 ServerBootstrap b = new ServerBootstrap(); b // 设置服务端的线程模型。 // bossGroup 负责不断接收新的连接,将新的连接交给 workerGroup 来处理。 .group(bossGroup, workerGroup) // 设置服务端的 IO 类型是 NIO。Netty 通过指定 Channel 的类型来指定 IO 类型。 .channel(NioServerSocketChannel.class) // 服务端启动过程中,需要经过哪些流程。 .handler(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); } }) // 用于设置一系列 Handler 来处理每个连接的数据 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { } }); // 绑定端口同步等待。等服务端启动完毕,才会进入下一行代码 ChannelFuture f = b.bind(8888).sync(); // 等待服务端关闭端口绑定,这里的作用是让程序不会退出 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
通过
ChannelFuture f = b.bind(8888).sync();
的bind
方法,进入源码进行查看。
首先,进入的是AbstractBootstrap
中,调用的最关键的方法是如下两个:
…… private ChannelFuture doBind(final SocketAddress localAddress) { …… final ChannelFuture regFuture = initAndRegister(); …… doBind0(regFuture, channel, localAddress, promise); …… } ……
进入initAndResgister()
方法中
…… final ChannelFuture initAndRegister() { …… // channel 的新建 channel = channelFactory.newChannel(); // channel 的初始化 init(channel); …… } ……
这里完成了 Channel 的新建和初始化,Debug 进去,发现channelFactory.newChannel()
实际上是调用了ReflectiveChannelFactory
的newChannel
方法,
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { …… private final Constructor<? extends T> constructor; public ReflectiveChannelFactory(Class<? extends T> clazz) { …… this.constructor = clazz.getConstructor(); …… } @Override public T newChannel() { …… return constructor.newInstance(); …… } …… }
这里调用了反射方法,其实就是将服务端代码中的这一行.channel(NioServerSocketChannel.class)
中的NioServerSocketChannel.class
传入进行对象创建,创建一个NioServerSocketChannel
实例。
在创建NioServerSocketChannel
的时候,调用了NioServerSocketChannel
的构造方法,构造方法的主要逻辑如下
…… public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) { this(newChannel(provider, family)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) { …… ServerSocketChannel channel = SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family); return channel == null ? provider.openServerSocketChannel() : channel; …… } ……
其中provider.openServerSocketChannel()
就是调用底层 JDK 的 API,获取了 JDK 底层的java.nio.channels.ServerSocketChannel
。
通过super(null, channel, SelectionKey.OP_ACCEPT);
一路跟踪进去,进入AbstractNioChannel
中,
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); …… ch.configureBlocking(false); …… }
关键代码是ch.configureBlocking(false)
,设置 I/O 模型为非阻塞模式。
通过super(parent)
跟踪上去,
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
其中 id 是 Netty 中每条 Channel 的唯一标识。
以上就是服务端 Channel 的创建过程。
接下来是服务端 Channel 的初始化过程,回到AbstractBootstrap.initAndResgister()
方法
…… final ChannelFuture initAndRegister() { …… // channel 的新建 channel = channelFactory.newChannel(); // channel 的初始化 init(channel); …… } ……
其中的init(channel)
方法就是服务端的 Channel 的初始化过程,Debug 进入,发现是调用了ServerBootstrap.init(channel)
方法,
@Override void init(Channel channel) { …… // 设置一些 Channel 的属性和配置信息 …… p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
其核心代码如上,主要用于定义服务端启动过程中需要执行哪些逻辑。主要分为两块:
-
一块是添加用户自定义的处理逻辑到服务端启动流程。
-
另一块是添加一个特殊的处理逻辑,ServerBootstrapAcceptor 是一个接入器,接受新请求,把新的请求传递给某个事件循环器。
以上就是服务端的 Channel 的初始化过程。接下来是服务端 Channel 的注册 Selector 的过程。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
在这个步骤中,我们可以看到关于 JDK 底层的操作
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
首先拿到在前面过程中创建的 JDK 底层的 Channel,然后调用 JDK 的 register() 方法,将 this 也即 NioServerSocketChannel 对象当作 attachment 绑定到 JDK 的 Selector 上,这样后续从 Selector 拿到对应的事件之后,就可以把 Netty 领域的 Channel 拿出来。
接下来是服务端绑定端口的逻辑,见AbstractBootstrap
中的doBind0
方法
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
图例
本文所有图例见:processon: Netty学习笔记
代码
更多内容见:Netty专栏