大纲
1.读数据入口
2.拆包原理
3.ByteToMessageDecoder解码步骤
4.解码器抽象的解码过程总结
5.Netty里常见的开箱即用的解码器
6.writeAndFlush()方法的大体步骤
7.MessageToByteEncoder的编码步骤
8.unsafe.write()写队列
9.unsafe.flush()刷新写队列
10.如何把对象变成字节流写到unsafe底层
1.读数据入口
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
在传播的过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法。
注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop { Selector selector; private SelectedSelectionKeySet selectedKeys; private boolean needsToSelectAgain; private int cancelledKeys; ... @Override protected void run() { for (;;) { ... //1.调用select()方法执行一次事件轮询 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } ... //2.处理产生IO事件的Channel needsToSelectAgain = false; processSelectedKeys(); ... //3.执行外部线程放入TaskQueue的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); ... //新连接已准备接入或者已经存在的连接有数据可读 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法 //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法 unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ... } public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected class NioByteUnsafe extends AbstractNioUnsafe { ... @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //创建ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; do { //1.分配一个ByteBuf byteBuf = allocHandle.allocate(allocator); //2.将数据读取到分配的ByteBuf中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } ... //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件 pipeline.fireChannelReadComplete(); ... } } }
NioByteUnsafe主要会进行如下处理:
一.通过客户端Channel的ChannelConfig获取内存分配器ByteBufAllocator,然后用内存分配器来分配一个ByteBuf对象
二.将客户端Channel中的TCP缓冲区的数据读取到ByteBuf对象
三.读取完数据后便调用DefaultChannelPipeline的fireChannelReadComplete()方法,从HeadContext结点开始在整个ChannelPipeline中传播ChannelRead事件
2.拆包原理
一.不用Netty的拆包原理
不断地从TCP缓冲区中读取数据,每次读完都判断是否为一个完整的数据包。如果当前读取的数据不足以拼接成一个完整的数据包,就保留数据,继续从TCP缓冲区中读。如果当前读取的数据加上已读取的数据足够拼成完整的数据包,就将拼好的数据包往业务传递,而多余的数据则保留。
二.Netty的拆包原理
Netty拆包基类内部会有一个字节容器,每次读取到数据就添加到字节容器中。然后尝试对累加的字节数据进行拆包,拆成一个完整的业务数据包,这个拆包基类叫ByteToMessageDecoder。
3.ByteToMessageDecoder解码步骤
(1)解码的整体步骤
(2)首先累加字节流
(3)然后调用子类的decode()方法进行解析
(4)接着清理累加字节容器
(5)最后将解析到的ByteBuf向下传播
(1)解码的整体介绍
一.累加字节流
Netty会通过一个ByteBuf字节容器cumulation,把所有读取到的字节流累加到该字节容器。
二.调用子类的decode()方法进行解析
把累加字节容器里的字节流通过子类的decode()方法进行解析。
三.将解析到的ByteBuf向下传播
如果调用子类的decode()方法可以解析到一个ByteBuf对象,则将这个ByteBuf对象向下传播。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ... public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //只基于ByteBuf对象进行解码 if (msg instanceof ByteBuf) { //1.累加字节流 //2.调用子类的decode()方法进行解析 //3.清理累加字节容器 //4.将解析到的ByteBuf向下传播 } else { ctx.fireChannelRead(msg); } } ... }
(2)首先累加字节流
如果当前字节容器中没有数据,那么就将字节容器的指针指向新读取的数据。如果当前字节容器中有数据,那么就调用累加器的cumulate()方法将数据累加到字节容器。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ByteBuf cumulation;//字节容器 private Cumulator cumulator = MERGE_CUMULATOR;//默认的累加器 private boolean decodeWasNull; private boolean first; private int discardAfterReads = 16; private int numReads; ... //Cumulate ByteBufs by merge them into one ByteBuf's, using memory copies. public static final Cumulator MERGE_CUMULATOR = new Cumulator() { //累加器的累加方法,会传入一个字节容器cumulation @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer;//一个大的字节容器,用来copy传入的字节容器 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } buffer.writeBytes(in);//将当前数据累加到copy的字节容器中 in.release(); return buffer;//返回copy的字节容器 } }; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { //1.累加字节流 ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) {//如果当前字节容器中没有数据 //就将字节容器的指针指向新读取的数据 cumulation = data; } else {//如果当前字节容器中有数据 //则调用累加器的cumulate()方法将数据累加到字节容器 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //2.将字节容器里的数据传递给业务拆包器进行拆包 //调用callDecode()方法对数据进行拆包 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //3.清理字节容器 if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //4.将解析到的ByteBuf向下传播 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation; cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation); oldCumulation.release(); return cumulation; } ... }
(3)然后调用子类的decode()方法进行解析
将数据累加到字节容器之后,就会调用callDecode()方法,这个方法会尝试将字节容器的数据拆分成业务数据包并将业务数据包放入业务数据容器out中。
Netty对各种用户协议的支持就体现在ByteToMessageDecoder的抽象方法decode()中,decode()方法的入参是当前读取到的未被处理的所有数据in和业务数据包容器out,所有拆包器都需要实现ByteToMessageDecoder的decoed()方法。
拆包器完成一次拆包后:如果没有拆到一个完整的数据包,此时若拆包器未读取任何数据则跳出循环,否则继续拆包。如果已经拆到一个完整的数据包,但此时拆包器未读取任何数据,则抛出一个异常DecodeException。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ... //Called once data should be decoded from the given ByteBuf. //This method will call #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@param out,the List to which decoded messages should be added protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); //Check if this handler was removed before continuing with decoding. //If it was removed, it is not safe to continue to operate on the buffer. if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); //调用拆包器实现的decode()方法 decode(ctx, in, out); //拆包器完成一次拆包后: //Check if this handler was removed before continuing the loop. //If it was removed, it is not safe to continue to operate on the buffer. if (ctx.isRemoved()) { break; } //outSize == out.size()表示没有拆到一个完整的数据包 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { //此时拆包器未读取任何数据则跳出循环 break; } else { //此时拆包器已读取到数据则继续拆包 continue; } } //执行到此处表明已经拆到一个完整的数据包 if (oldInputLength == in.readableBytes()) { //此时拆包器未读取任何数据,于是抛出一个异常DecodeException throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } //Decode the from one ByteBuf to an other. //This method will be called till either the input ByteBuf has nothing to read //when return from this method or till nothing was read from the input ByteBuf. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@param out,the List to which decoded messages should be added //@throws Exception,is thrown if an error accour protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; ... }
(4)接着清理字节容器
拆包器完成拆包后,只是从字节容器中取走了数据,但这部分空间对字节容器来说还依然被保留。而字节容器每次累加字节数据时都是将字节数据追加到尾部,如果不对字节容器进行清理,那么时间一长可能就会OOM。
正常情况下,每次读取完数据之后,ByteToMessageDecoder解码器都会在channelReadComplete()方法里清理字节容器。但是如果发送端发送数据过快,那么解码器的channelReadComplete()方法可能会很久才被调用一次。
所以为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据并完成业务拆包后,清理字节容器。如果字节容器当前已无数据可读,则调用字节容器的release()方法释放字节容器。如果字节容器当前还有数据可读,并且已经连续读取了16次还有未拆包的数据,那么就进行压缩处理。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { private int discardAfterReads = 16; ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { //1.累加字节流 ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) {//如果当前字节容器中没有数据 //就将字节容器的指针指向新读取的数据 cumulation = data; } else {//如果当前字节容器中有数据 //则调用累加器的cumulate()方法将数据累加到字节容器 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //2.将字节容器里的数据传递给业务拆包器进行拆包 //调用callDecode()方法对数据进行拆包 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //3.清理字节容器 if (cumulation != null && !cumulation.isReadable()) { //如果字节容器当前已无数据可读,则设置numReads为0,并释放字节容器cumulation numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) {//numReads >= 16 //如果当前还有数据可读,并且已经连续读取了16次即numReads >= 16, //此时字节容器中仍有未被业务拆包器拆包的数据,那么就做一次压缩处理; numReads = 0; discardSomeReadBytes(); } //4.将解析到的ByteBuf向下传播 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } //Get numElements out of the CodecOutputList and forward these through the pipeline. static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { //遍历业务数据包容器 for(int i = 0; i < numElements; i ++) { //将一个个完整的业务数据包ByteBuf传递到后续的ChannelHandler进行处理 ctx.fireChannelRead(msgs.getUnsafe(i)); } } ... }
(5)最后将解析到的ByteBuf向下传播
也就是调用fireChannelRead()方法,遍历业务数据包容器,将一个个完整的业务数据包ByteBuf传递到后续的ChannelHandler中进行处理。
4.解码器抽象的解码过程总结
解码过程是通过一个叫ByteToMessageDecoder的抽象解码器来实现的,ByteToMessageDecoder实现的解码过程分为如下四步。
步骤一:累加字节流
也就是把当前读到的字节流累加到一个字节容器里。
步骤二:调用子类的decode()方法进行解析
ByteToMessageDecoder的decode()方法是一个抽象方法,不同种类的解码器会有自己的decode()方法逻辑。该decode()方法被调用时会传入两个关键参数:一个是ByteBuf对象表示当前累加的字节流,一个是List列表用来存放被成功解码的业务数据包。
步骤三:清理字节容器
为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据并完成业务拆包后,清理字节容器。
步骤四:传播已解码的业务数据包
如果List列表里有解析出来的业务数据包,那么就通过pipeline的事件传播机制往下进行传播。
5.Netty里常见的开箱即用的解码器
(1)基于固定长度解码器
(2)基于行分隔符解码器
(3)基于分隔符解码器
(4)基于长度域解码器
(1)基于固定长度解码器
判断当前字节容器可读字节是否小于固定长度。
//A decoder that splits the received ByteBufs by the fixed number of bytes. //For example, if you received the following four fragmented packets: //+---+----+------+----+ //| A | BC | DEFG | HI | //+---+----+------+----+ //A FixedLengthFrameDecoder (3) will decode them into the following three packets with the fixed length: //+-----+-----+-----+ //| ABC | DEF | GHI | //+-----+-----+-----+ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; //Creates a new instance. public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } //Create a frame out of the ByteBuf and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); } } }
(2)基于行分隔符解码器
基于行分隔符的拆包器可以同时处理n和rn两种类型的行分隔符,其处理逻辑分为非丢弃模式和丢弃模式、找到行分隔符和未找到行分隔符的情况。
一.非丢弃模式时找到行分隔符
首先新建一个帧,也就是ByteBuf frame。然后计算需要解码的数据包的长度和分隔符的长度。接着判断需要拆包的长度是否大于该拆包器允许的最大长度,如果大于,则丢弃这段数据,返回null。然后将一个完整的数据包取出,如果stripDelimiter在构造方法中被设置为false,则数据包含分隔符。
二.非丢弃模式时未找到行分隔符
首先取得当前字节容器的可读字节数,然后判断是否超出允许的最大长度。如果没超过最大长度,则直接返回null,字节容器的数据没有改变。如果已超过最大长度,则进入丢弃模式,设置discarding为true。
三.丢弃模式下找到行分隔符
这种情况下需要将分隔符之前的数据都丢弃。在计算出分隔符的长度之后,会通过移动字节容器的readerIndex指针把分隔符之前的数据全部丢弃,当然丢弃的数据也包括分隔符。经过这么一次丢弃后,后面就有可能是正常的数据包。于是设置discarding为false进入非丢弃模式,这样下次解码数据包时就会进入正常的解码流程。
四.丢弃模式下未找到行分隔符
由于当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,所以当前数据继续丢弃,移动字节容器的readerIndex指针。
//A decoder that splits the received {@link ByteBuf}s on line endings. //Both "n" and "rn" are handled. //For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder. public class LineBasedFrameDecoder extends ByteToMessageDecoder { //Maximum length of a frame we're willing to decode. private final int maxLength; //Whether or not to throw an exception as soon as we exceed maxLength. private final boolean failFast; private final boolean stripDelimiter; //True if we're discarding input because we're already over maxLength. private boolean discarding; private int discardedBytes; public LineBasedFrameDecoder(final int maxLength) { this(maxLength, true, false); } public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { this.maxLength = maxLength; this.failFast = failFast; this.stripDelimiter = stripDelimiter; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } //Create a frame out of the ByteBuf and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param buffer,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) {//非丢弃模式 if (eol >= 0) {//找到行分隔符 //新建一个帧,也就是ByteBuf frame final ByteBuf frame; //计算需要解码的数据包的长度 final int length = eol - buffer.readerIndex(); //计算分隔符的长度 final int delimLength = buffer.getByte(eol) == 'r'? 2 : 1; //判断需要拆包的长度是否大于该拆包器允许的最大长度 if (length > maxLength) { //如果大于,则丢弃这段数据,返回null buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } //将一个完整的数据包取出 if (stripDelimiter) { frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { //如果stripDelimiter在构造方法中被设置为false,则数据包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else {//未找到行分隔符 //首先取得当前字节容器的可读字节数 final int length = buffer.readableBytes(); //然后判断是否超出允许的最大长度 if (length > maxLength) { //如果已超过最大长度,则进入丢弃模式,设置discarding为true discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; if (failFast) { fail(ctx, "over " + discardedBytes); } } //如果没超过最大长度,则直接返回null,字节容器的数据没有改变 return null; } } else {//丢弃模式 if (eol >= 0) {//找到行分隔符 final int length = discardedBytes + eol - buffer.readerIndex(); //计算出分隔符的长度 final int delimLength = buffer.getByte(eol) == 'r'? 2 : 1; //把分隔符之前的数据全部丢弃,移动字节容器的readerIndex指针 buffer.readerIndex(eol + delimLength); discardedBytes = 0; //经过这么一次丢弃后,后面就有可能是正常的数据包 //于是设置discarding为false,这样下次解码数据包时就会进入正常的解码流程 discarding = false; if (!failFast) { fail(ctx, length); } } else {//未找到行分隔符 //当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完 //所以当前数据继续丢弃,移动字节容器的readerIndex指针 discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); } return null; } } private void fail(final ChannelHandlerContext ctx, int length) { fail(ctx, String.valueOf(length)); } private void fail(final ChannelHandlerContext ctx, String length) { ctx.fireExceptionCaught(new TooLongFrameException("frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')')); } //Returns the index in the buffer of the end of line found. //Returns -1 if no end of line was found in the buffer. private static int findEndOfLine(final ByteBuf buffer) { int i = buffer.forEachByte(ByteProcessor.FIND_LF); if (i > 0 && buffer.getByte(i - 1) == 'r') { i--; } return i; } }
(3)基于分隔符解码器
可以向基于分隔符解码器DelimiterBasedFrameDecoder传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的decode()方法和基于行分隔符解码器的decode()方法基本类似。
//A decoder that splits the received ByteBufs by one or more delimiters. public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { private final ByteBuf[] delimiters; private final int maxFrameLength; private final boolean stripDelimiter; private final boolean failFast; private boolean discardingTooLongFrame; private int tooLongFrameLength; private final LineBasedFrameDecoder lineBasedDecoder; ... //Creates a new instance. //@param maxFrameLength,the maximum length of the decoded frame. //A TooLongFrameException is thrown if the length of the frame exceeds this value. //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder //notices the length of the frame will exceed maxFrameLength regardless of //whether the entire frame has been read. //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read. //@param delimiters the delimiters public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) { validateMaxFrameLength(maxFrameLength); if (delimiters == null) { throw new NullPointerException("delimiters"); } if (delimiters.length == 0) { throw new IllegalArgumentException("empty delimiters"); } if (isLineBased(delimiters) && !isSubclass()) { lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else { this.delimiters = new ByteBuf[delimiters.length]; for (int i = 0; i < delimiters.length; i ++) { ByteBuf d = delimiters[i]; validateDelimiter(d); this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes()); } lineBasedDecoder = null; } this.maxFrameLength = maxFrameLength; this.stripDelimiter = stripDelimiter; this.failFast = failFast; } //Returns true if the delimiters are "n" and "rn". private static boolean isLineBased(final ByteBuf[] delimiters) { if (delimiters.length != 2) { return false; } ByteBuf a = delimiters[0]; ByteBuf b = delimiters[1]; if (a.capacity() < b.capacity()) { a = delimiters[1]; b = delimiters[0]; } return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == 'r' && a.getByte(1) == 'n' && b.getByte(0) == 'n'; } //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder private boolean isSubclass() { return getClass() != DelimiterBasedFrameDecoder.class; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } //Create a frame out of the {@link ByteBuf} and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param buffer,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); } //Try all delimiters and choose the delimiter which yields the shortest frame. int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } } if (minDelim != null) { int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { //We've just finished discarding a very large frame. //Go back to the initial state. discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } if (minFrameLength > maxFrameLength) { //Discard read frame. buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { //Discard the content of the buffer until a delimiter is found. tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { //Still discarding the buffer since a delimiter is not found. tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; } } private void fail(long frameLength) { if (frameLength > 0) { throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"); } else { throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding"); } } //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack. //-1 is returned if no needle is found in the haystack. private static int indexOf(ByteBuf haystack, ByteBuf needle) { for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) { int haystackIndex = i; int needleIndex; for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) { if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) { break; } else { haystackIndex ++; if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) { return -1; } } } if (needleIndex == needle.capacity()) { //Found the needle from the haystack! return i - haystack.readerIndex(); } } return -1; } private static void validateDelimiter(ByteBuf delimiter) { if (delimiter == null) { throw new NullPointerException("delimiter"); } if (!delimiter.isReadable()) { throw new IllegalArgumentException("empty delimiter"); } } private static void validateMaxFrameLength(int maxFrameLength) { if (maxFrameLength <= 0) { throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength); } } ... }
(4)基于长度域解码器
主要的逻辑步骤如下:
一.丢弃模式的处理
二.获取待拆数据包的大小
三.对数据包进行长度校验
四.跳过指定字节长度
五.抽取数据包
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private final ByteOrder byteOrder;//表示字节流表示的数据是大端还是小端,用于长度域的读取 private final int maxFrameLength;//表示数据包的最大长度 private final int lengthFieldOffset;//表示长度域的偏移量 private final int lengthFieldLength;//表示长度域的长度 private final int lengthFieldEndOffset;//表示紧跟长度域字段后面的第一字节在整个数据包中的偏移量 private final int lengthAdjustment;//表示数据包体长度调整大小,长度域只表示数据包体的长度 private final int initialBytesToStrip;//表示拿到完整的数据包之后,向业务解码器传递之前,应该跳过多少字节 private final boolean failFast;//默认为true,否则可能会OOM private boolean discardingTooLongFrame; private long tooLongFrameLength; private long bytesToDiscard; ... //Creates a new instance. //@param byteOrder,the ByteOrder of the length field //@param maxFrameLength,the maximum length of the frame. //If the length of the frame is greater than this value, TooLongFrameException will be thrown. //@param lengthFieldOffset,the offset of the length field //@param lengthFieldLength,the length of the length field //@param lengthAdjustment,the compensation value to add to the value of the length field //@param initialBytesToStrip,the number of first bytes to strip out from the decoded frame //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder notices the length of the frame //will exceed maxFrameLength regardless of whether the entire frame has been read. //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read. public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { ... this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } //Create a frame out of the {@link ByteBuf} and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { //步骤一开始:丢弃模式的处理 if (discardingTooLongFrame) { //如果当前处于丢弃模式,则先计算需要丢弃多少字节,取当前还需可丢弃字节和可读字节的最小值 long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes()); in.skipBytes(localBytesToDiscard);//进行丢弃 bytesToDiscard -= localBytesToDiscard; this.bytesToDiscard = bytesToDiscard; failIfNecessary(false); } //步骤一结束 //步骤二开始:获取待拆数据包的大小 //如果当前可读字节还没达到长度域的偏移,说明肯定是读不到长度域的,则直接不读 if (in.readableBytes() < lengthFieldEndOffset) { return null; } //计算长度域的实际字节偏移 int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; //拿到实际的未调整过的数据包长度 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); //如果拿到的长度为负数,则直接跳过长度域并抛出异常 if (frameLength < 0) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength); } //调整数据包的长度,后面统一做拆分 frameLength += lengthAdjustment + lengthFieldEndOffset; //步骤二结束 //步骤三开始:对数据包进行长度校验 //整个数据包的长度还没有长度域长,则直接抛出异常 if (frameLength < lengthFieldEndOffset) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset); } //数据包长度超出最大数据包长度,进入丢弃模式 if (frameLength > maxFrameLength) { long discard = frameLength - in.readableBytes(); tooLongFrameLength = frameLength; if (discard < 0) { //当前可读字节已达到frameLength,直接跳过frameLength字节 //丢弃之后,后面又可能就是一个合法的数据包了 in.skipBytes((int) frameLength); } else { //当前可读字节未达到frameLength,说明后面未读到的字节也需要丢弃,进入丢弃模式,先把当前累积的字节全部丢弃 discardingTooLongFrame = true; //bytesToDiscard表示还需要丢弃多少字节 bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } //调用failIfNecessary判断是否需要抛出异常 failIfNecessary(true); return null; } //步骤三结束 //步骤四开始:跳过指定字节长度 //never overflows because it's less than maxFrameLength int frameLengthInt = (int) frameLength; if (in.readableBytes() < frameLengthInt) { //如果可读字节还是小于数据包的长度,则返回,下次继续读取 return null; } if (initialBytesToStrip > frameLengthInt) { //如果跳过的字节大于数据包的长度,则抛异常 in.skipBytes(frameLengthInt); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } in.skipBytes(initialBytesToStrip); //步骤四结束 //步骤五开始:抽取数据包 //拿到当前累积数据的读指针 int readerIndex = in.readerIndex(); //拿到待抽取数据包的实际长度进行抽取 int actualFrameLength = frameLengthInt - initialBytesToStrip; //进行抽取数据 ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); //移动读指针 in.readerIndex(readerIndex + actualFrameLength); return frame; } protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { return buffer.retainedSlice(index, length); } //拿到实际的未调整过的数据包长度 //如果长度域代表的值表达的含义不是正常的int、short等类型,则可以重写这个方法 //比如有的长度域虽然是4字节,比如0x1234,但是它的含义是十进制的,即长度就是十进制的1234 protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { buf = buf.order(order); long frameLength; switch (length) { case 1: frameLength = buf.getUnsignedByte(offset); break; case 2: frameLength = buf.getUnsignedShort(offset); break; case 3: frameLength = buf.getUnsignedMedium(offset); break; case 4: frameLength = buf.getUnsignedInt(offset); break; case 8: frameLength = buf.getLong(offset); break; default: throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)"); } return frameLength; } private void failIfNecessary(boolean firstDetectionOfTooLongFrame) { //不需要再丢弃后面的未读字节,就开始重置丢弃状态 if (bytesToDiscard == 0) { //Reset to the initial state and tell the handlers that the frame was too large. long tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; discardingTooLongFrame = false; //如果没有设置快速失败,或者设置了快速失败并且是第一次检测到大包错误,则抛出异常,让Handler处理 if (!failFast || failFast && firstDetectionOfTooLongFrame) { fail(tooLongFrameLength); } } else { //如果设置了快速失败,并且是第一次检测到打包错误,则抛出异常,让Handler处理 if (failFast && firstDetectionOfTooLongFrame) { fail(tooLongFrameLength); } } } protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { return buffer.retainedSlice(index, length); } private void fail(long frameLength) { if (frameLength > 0) { throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"); } else { throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + " - discarding"); } } ... }
6.writeAndFlush()方法的大体步骤
(1)writeAndFlush()方法的调用入口
(2)writeAndFlush()方法的执行流程
(1)writeAndFlush()方法的调用入口
入口通常是:ctx.channel().writeAndFlush()。
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //网络连接tcp三次握手后,就会建立和封装一个Channel(网络连接的通信管道) //此时这个Channel就可以实现一个激活 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel Active......"); ctx.channel().writeAndFlush("test5"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Channel Read: " + (String)msg); String response = "Hello World......"; ByteBuf responseByteBuf = Unpooled.buffer(); responseByteBuf.writeBytes(response.getBytes()); ctx.channel().writeAndFlush(responseByteBuf); System.out.println("Channel Write: " + response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel Read Complete......"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
(2)writeAndFlush()方法的执行流程
首先从tail结点开始往前传播。然后逐个调用ChannelHandler的write()方法,直到某个ChannelHandler不再往前传播write事件。接着逐个调用ChannelHandler的flush()方法,直到某个ChannelHandler不再往前传播flush事件。
一般而言,只要每个ChannelHandler都往下传播write事件和flush事件,那么最后都会传播到HeadContext结点的write()方法和flush()方法,然后分别执行unsafe.write()和unsafe.flush()将数据通过底层的unsafe写到JDK底层的Channel。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final DefaultChannelPipeline pipeline; ... @Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); } ... } public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } @Override public final ChannelFuture writeAndFlush(Object msg) { //从TailContext开始传播 //但TailContext没有重写writeAndFlush()方法 //所以会调用AbstractChannelHandlerContext的writeAndFlush()方法 return tail.writeAndFlush(msg); } ... } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... @Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) throw new NullPointerException("msg"); if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { //反向遍历链表进行查找 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); //最终都会由Reactor线程处理Channel的数据读写 if (executor.inEventLoop()) { if (flush) { //调用结点的invokeWriteAndFlush()方法 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传 //即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播 invokeWrite0(msg, promise); //逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传 //即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播 invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { //逐个调用,最终回到HeadContext的write()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } private void invokeFlush0() { try { //逐个调用,最终回到HeadContext的flush()方法 ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } } ... } public class DefaultChannelPipeline implements ChannelPipeline { ... final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } } ... } //Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext. public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
7.MessageToByteEncoder的编码步骤
(1)编码的具体步骤
(2)编码步骤的总结
(3)子类实现编码的例子
(1)编码的具体步骤
步骤一:判断对象
判断当前ChannelHandler结点能否处理写入传入的Java对象。如果能处理,则往下执行,否则直接传递给下一个ChannelHandler结点进行处理。
步骤二:分配内存
给新创建的ByteBuf对象分配一块内存空间,这块内存空间将会存放由Java对象转换来的字节数据。
步骤三:调用encode
子类会实现MessageToByteEncoder的抽象方法encode()来定义自己的编码协议,子类的encode()方法会将Java对象转换来的字节数据写入ByteBuf。
步骤四:释放对象
由于传入的Java对象已经转换成ByteBuf字节流了,所以传入的Java对象已不再使用可进行释放。
步骤五:传播数据
当子类的encode()方法将数据写入了ByteBuf对象以及释放完对象之后,则会往前一个ChannelHandler结点传播该ByteBuf对象,否则往前一个ChannelHandler结点传播空对象。
步骤六:释放内存
如果出现异常或者ByteBuf没有写入数据或者ByteBuf在pipeline中已处理完,则释放分配给ByteBuf对象的内存。
//ChannelOutboundHandlerAdapter which encodes message in a stream-like fashion from one message to an ByteBuf. //Example implementation which encodes Integers to a ByteBuf. //public class IntegerEncoder extends MessageToByteEncoder<Integer> { // @code @Override // public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { // out.writeInt(msg); // } //} public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { private final TypeParameterMatcher matcher; private final boolean preferDirect; protected MessageToByteEncoder() { this(true); } protected MessageToByteEncoder(Class<? extends I> outboundMessageType) { this(outboundMessageType, true); } //Create a new instance which will try to detect the types to match out of the type parameter of the class. //@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. //If false is used it will allocate a heap ByteBuf, which is backed by an byte array. protected MessageToByteEncoder(boolean preferDirect) { matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); this.preferDirect = preferDirect; } //Create a new instance //@param outboundMessageType,The tpye of messages to match //@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. //If false is used it will allocate a heap ByteBuf, which is backed by an byte array. protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) { matcher = TypeParameterMatcher.get(outboundMessageType); this.preferDirect = preferDirect; } //Returns true if the given message should be handled. //If false it will be passed to the next ChannelOutboundHandler in the ChannelPipeline. public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { //步骤一:判断当前ChannelHandler能否处理写入的消息 if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg;//强制转换 //步骤二:给ByteBuf对象分配内存 buf = allocateBuffer(ctx, cast, preferDirect); try { //步骤三:调用子类实现的encode()方法 encode(ctx, cast, buf); } finally { //步骤四:释放对象 //既然自定义的Java对象msg已经转换为ByteBuf对象了,那么该对象已经没有用,需要释放掉了 //注意:当传入的msg的类型是ByteBuf类型时,则不需要释放 ReferenceCountUtil.release(cast); } //步骤五:如果buf中写入了数据,就把buf传到下一个ChannelHandler结点 if (buf.isReadable()) { ctx.write(buf, promise); } else { //步骤六:如果buf中没有写入数据,则释放buf,并将一个空数据传到下一个ChannelHandler结点 buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release();//当buf在pipeline中处理完了,需要进行释放 } } } //Allocate a ByteBuf which will be used as argument of #encode(ChannelHandlerContext, I, ByteBuf). //Sub-classes may override this method to returna ByteBuf with a perfect matching initialCapacity. protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } } //Encode a message into a ByteBuf. //This method will be called for each written message that can be handled by this encoder. //@param ctx,the ChannelHandlerContext which this MessageToByteEncoder belongs to //@param msg,the message to encode //@param out,the ByteBuf into which the encoded message will be written //@throws Exception,is thrown if an error accour protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; }
(2)编码步骤的总结
在MessageToByteEncoder的编码过程中,首先会判断当前ChannelHandler能否处理传入的Java对象,如果能处理就对新创建的ByteBuf对象分配一块内存空间。然后由子类的encode()方法实现具体的编码协议,并且把编码后的数据存放到分配给ByteBuf对象的内存空间中。最后把ByteBuf对象往前一个ChannelHandler结点进行传播。
如果在编码的过程中出现异常,那么就把已申请出来的、分配给ByteBuf对象的内存空间进行释放。
如果传入的Java对象就是一个ByteBuf对象,那么Netty在自定义编码结束后,会自动帮忙释放该对象,不需要在子类中对该对象进行释放。
(3)子类实现编码的例子
下面的Encoder便实现了将自定义的Response对象转换为字节流并写到Socket底层的效果。
public class Encoder extends MessageToByteEncoder<Response> { protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception { out.writeByte(response.getVersion()); out.writeInt(4+ response.getData().length); out.writeBytes(response.getData()); } }
8.unsafe.write()将数据添加到写缓冲区
(1)unsafe.write()的入口
(2)unsafe.write()的主要逻辑
(3)写缓冲区(写队列)的数据结构
(1)unsafe.write()的入口
不管是ctx.channel().write()还是ctx.write(),最终都会来到pipeline中的head结点。
public class DefaultChannelPipeline implements ChannelPipeline { ... final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } } ... }
(2)unsafe.write()的主要逻辑
unsafe.write()方法将数据添加到写缓冲区(写队列)的主要逻辑如下。
一.Direct化ByteBuf对象
如果传进来的ByteBuf对象不是堆外内存,那么就直接转换成堆外内存,并且估算出其大小。
二.添加到写缓冲区
转换成堆外内存的ByteBuf对象首先会被封装成一个Entry对象,然后再将该Entry对象添加到写缓冲区,其中会通过几个指针来标识写缓冲区的状态。
三.设置写状态
如果内存不足,那么是不可以一直往写缓冲区里添加ByteBuf对象的。如果写缓冲区已经大于默认的64KB的大小,则会通过自旋 + CAS设置当前Channel为不可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final DefaultChannelPipeline pipeline; ... protected abstract class AbstractUnsafe implements Unsafe { //写缓冲区(写队列) private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); ... @Override public final void write(Object msg, ChannelPromise promise) { //确保该方法的调用是在Reactor线程中 assertEventLoop(); //写缓冲区 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... int size; try { //转换成堆外内存 msg = filterOutboundMessage(msg); //估算出需要写入的ByteBuf的size size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //将转换成堆外内存的msg添加到写缓冲区outboundBuffer outboundBuffer.addMessage(msg, size, promise); } ... } ... } public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... @Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } ... }
(3)写缓冲区(写队列)的数据结构
ChannelOutboundBuffer里的数据结构是一个单向链表,单向链表的每个结点都是一个Entry对象。在一个Entry对象中会包含着待写出的ByteBuf对象及消息回调promise。flushedEntry指针表示第一个被写入Socket缓冲区的结点,unflushedEntry指针表示第一个未被写入Socket缓冲区的结点,tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个结点。
初次调用ChannelOutboundBuffer的addMessage()方法后,flushedEntry指针指向NULL,unflushedEntry指针和tailEntry指针都指向新添加的结点。调用多次ChannelOutboundBuffer的addMessage()方法后,如果flushedEntry指针一直指向NULL,则表示现在还没有结点的ByteBuf对象写出到Socket缓冲区。如果unflushedEntry指针之后有n个结点,则表示当前还有n个结点的ByteBuf对象还没写出到Socket缓冲区。
public final class ChannelOutboundBuffer { private final Channel channel; //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) //The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; //The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; //The Entry which represents the tail of the buffer private Entry tailEntry; ... ChannelOutboundBuffer(AbstractChannel channel) { this.channel = channel; } //Add given message to this ChannelOutboundBuffer. //The given {@link ChannelPromise} will be notified once the message was written. public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } //increment pending bytes after adding message to the unflushed arrays. incrementPendingOutboundBytes(size, false); } static final class Entry { private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @Override protected Entry newObject(Handle handle) { return new Entry(handle); } }; private final Handle<Entry> handle; Entry next; Object msg; ByteBuffer[] bufs; ByteBuffer buf; ChannelPromise promise; long progress; long total; int pendingSize; int count = -1; boolean cancelled; private Entry(Handle<Entry> handle) { this.handle = handle; } static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; } ... } }
9.unsafe.flush()刷新写缓冲区的数据
(1)unsafe.flush()的入口
(2)unsafe.flush()的主要逻辑
(1)unsafe.flush()的入口
不管是ctx.channel().flush()还是ctx.flush(),最终都会来到pipeline中的head结点。
public class DefaultChannelPipeline implements ChannelPipeline { ... final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } } ... }
(2)unsafe.flush()的主要逻辑
步骤一:设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,并统计需要刷新的Entry结点的数量。
步骤二:遍历写缓冲区的Entry结点把对应的ByteBuf对象写到Socket,然后移除Entry结点。如果写缓冲区大小已经小于32KB,则通过自旋 + CAS设置Channel为可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final DefaultChannelPipeline pipeline; ... protected abstract class AbstractUnsafe implements Unsafe { private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); ... public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } //步骤一 outboundBuffer.addFlush(); //步骤二 flush0(); } protected void flush0() { final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... doWrite(outboundBuffer); ... } } //Flush the content of the given buffer to the remote peer. protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception; } public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { //默认自旋16次,以提高内存使用率和写的吞吐量 int writeSpinCount = config().getWriteSpinCount(); do { Object msg = in.current(); if (msg == null) { //重新注册,不关注OP_WRITE事件 clearOpWrite(); return; } writeSpinCount -= doWriteInternal(in, msg); } while(writeSpinCount > 0); incompleteWrite(setOpWrite); } private int doWriteInternal(ChannelOutboundBuffer in, Object msg) { ... ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { //从写缓冲区(写队列)中移除结点 in.remove(); return 0; } //把ByteBuf对象写到Socket里 final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { //从写缓冲区(写队列)中移除结点 in.remove(); } return 1; } ... } protected final void clearOpWrite() { final SelectionKey key = selectionKey(); //Check first if the key is still valid as it may be canceled as part of the deregistration from the EventLoop. if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } } @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); } ... } public final class ChannelOutboundBuffer { private final Channel channel; //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) //The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; //The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; //The Entry which represents the tail of the buffer private Entry tailEntry; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER; @SuppressWarnings("UnusedDeclaration") private volatile int unwritable; //The number of flushed entries that are not written yet private int flushed; ... //设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点, //并统计需要刷新的Entry结点的数量 public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++;//所要flush的结点数 entry = entry.next; } while (entry != null); unflushedEntry = null; } } public boolean remove() { //获取当前正在被flush的结点 Entry e = flushedEntry; Object msg = e.msg; //获取该结点的回调对象 ChannelPromise promise = e.promise; int size = e.pendingSize; //从写缓冲队列中移除结点 removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); //如果写缓冲区大小小于32KB,就通过自旋+CAS设置Channel状态为可写 decrementPendingOutboundBytes(size, false, true); } //回收实体 e.recycle(); return true; } private void removeEntry(Entry e) { if (-- flushed == 0) { flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { flushedEntry = e.next; } } //Return the current message to write or null if nothing was flushed before and so is ready to be written. public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; } //Notify the ChannelPromise of the current message about writing progress. public void progress(long amount) { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { long progress = e.progress + amount; e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } } private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } } private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } ... }
10.如何把对象变成字节流写到unsafe底层
当调用ctx.channel().writeAndFlush(user)将自定义的User对象沿着整个Pipeline进行传播时:
首先会调用tail结点的write()方法开始往前传播,传播到一个继承自MessageToByteEncoder的结点。该结点会实现MessageToByteEncoder的encode()方法来把自定义的User对象转换成一个ByteBuf对象。转换的过程首先会由MessageToByteEncoder分配一个ByteBuf对象,然后再调用其子类实现的抽象方法encode()将User对象填充到ByteBuf对象中。填充完之后继续调用write()方法把该ByteBuf对象往前进行传播,默认下最终会传播到head结点。
其中head结点的write()方法会通过底层的unsafe进行如下处理:把当前的ByteBuf对象添加到unsafe维护的一个写缓冲区里,同时计算写缓冲区大小是否超过64KB。如果写缓冲区大小超过了64KB,则设置当前Channel不可写。完成write()方法的传播后,head结点的unsafe对象维护的写缓冲区便对应着一个ByteBuf队列,它是一个单向链表。
然后会调用tail结点的flush()方法开始往前传播,默认下最终会传播到head结点。head结点在接收到flush事件时会通过底层的unsafe进行如下处理:首先进行指针调整,然后通过循环遍历从写缓冲区里把ByteBuf对象取出来。每拿出一个ByteBuf对象都会把它转化为JDK底层可以接受的ByteBuffer对象,最终通过JDK的Channel把该ByteBuffer对象写出去。每写完一个ByteBuffer对象都会把写缓冲区里的当前ByteBuf所在的Entry结点进行删除,并且判断如果当前写缓冲区里的大小已经小于32KB就通过自旋 + CAS重新设置Channel为可写。