//Implementations are responsible to allocate buffers. Implementations of this interface are expected to be thread-safe. public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; //Allocate a ByteBuf. If it is a direct or heap buffer depends on the actual implementation. ByteBuf buffer(); //Allocate a ByteBuf with the given initial capacity. //If it is a direct or heap buffer depends on the actual implementation. ByteBuf buffer(int initialCapacity); //Allocate a ByteBuf} with the given initial capacity and the given maximal capacity. //If it is a direct or heap buffer depends on the actual implementation. ByteBuf buffer(int initialCapacity, int maxCapacity); //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O. ByteBuf ioBuffer(); //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O. ByteBuf ioBuffer(int initialCapacity); //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O. ByteBuf ioBuffer(int initialCapacity, int maxCapacity); //Allocate a heap ByteBuf. ByteBuf heapBuffer(); //Allocate a heap ByteBuf with the given initial capacity. ByteBuf heapBuffer(int initialCapacity); //Allocate a heap ByteBuf with the given initial capacity and the given maximal capacity. ByteBuf heapBuffer(int initialCapacity, int maxCapacity); //Allocate a direct ByteBuf. ByteBuf directBuffer(); //Allocate a direct ByteBuf with the given initial capacity. ByteBuf directBuffer(int initialCapacity); //Allocate a direct ByteBuf with the given initial capacity and the given maximal capacity. ByteBuf directBuffer(int initialCapacity, int maxCapacity); //Allocate a CompositeByteBuf. //If it is a direct or heap buffer depends on the actual implementation. CompositeByteBuf compositeBuffer(); //Allocate a CompositeByteBuf with the given maximum number of components that can be stored in it. //If it is a direct or heap buffer depends on the actual implementation. CompositeByteBuf compositeBuffer(int maxNumComponents); //Allocate a heap CompositeByteBuf. CompositeByteBuf compositeHeapBuffer(); //Allocate a heap CompositeByteBuf with the given maximum number of components that can be stored in it. CompositeByteBuf compositeHeapBuffer(int maxNumComponents); //Allocate a direct CompositeByteBuf. CompositeByteBuf compositeDirectBuffer(); //Allocate a direct CompositeByteBuf with the given maximum number of components that can be stored in it. CompositeByteBuf compositeDirectBuffer(int maxNumComponents); //Returns true if direct ByteBuf's are pooled boolean isDirectBufferPooled(); //Calculate the new capacity of a ByteBuf that is used when a ByteBuf needs to expand by the minNewCapacity with maxCapacity as upper-bound. int calculateNewCapacity(int minNewCapacity, int maxCapacity); }
//Skeletal ByteBufAllocator implementation to extend. public abstract class AbstractByteBufAllocator implements ByteBufAllocator { private static final int DEFAULT_INITIAL_CAPACITY = 256; private final boolean directByDefault; private final ByteBuf emptyBuf; //Instance use heap buffers by default protected AbstractByteBufAllocator() { this(false); } //Create new instance //@param preferDirect true if #buffer(int) should try to allocate a direct buffer rather than a heap buffer protected AbstractByteBufAllocator(boolean preferDirect) { directByDefault = preferDirect && PlatformDependent.hasUnsafe(); emptyBuf = new EmptyByteBuf(this); } @Override public ByteBuf buffer() { if (directByDefault) { return directBuffer(); } return heapBuffer(); } @Override public ByteBuf directBuffer() { return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE); } @Override public ByteBuf heapBuffer() { return heapBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); } private static void validate(int initialCapacity, int maxCapacity) { if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)"); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format("initialCapacity: %d (expected: not greater than maxCapacity(%d)", initialCapacity, maxCapacity)); } } //Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity. protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity); //Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity. protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity); ... }
//Simplistic ByteBufAllocator implementation that does not pool anything. public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { ... @Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); } ... } //1.使用UnpooledUnsafeHeapByteBuf通过unsafe创建一个非池化的堆内存ByteBuf final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf { ... //Creates a new heap buffer with a newly allocated byte array. //@param initialCapacity the initial capacity of the underlying byte array //@param maxCapacity the max capacity of the underlying byte array UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override public byte getByte(int index) { checkIndex(index); return _getByte(index); } @Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(array, index); } ... } //2.使用UnpooledHeapByteBuf通过非unsafe创建一个非池化的堆内存ByteBuf public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; byte[] array; ... //Creates a new heap buffer with a newly allocated byte array. //@param initialCapacity the initial capacity of the underlying byte array //@param maxCapacity the max capacity of the underlying byte array protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { this(alloc, new byte[initialCapacity], 0, 0, maxCapacity); } private UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) { ... this.alloc = alloc; setArray(initialArray);//设置直接创建的字节数组 setIndex(readerIndex, writerIndex);//设置读写指针为0 } private void setArray(byte[] initialArray) { array = initialArray; ... } @Override public byte getByte(int index) { ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(array, index); } ... } //3.使用UnsafeByteBufUtil.newUnsafeDirectByteBuf()创建一个非池化的直接内存ByteBuf final class UnsafeByteBufUtil { ... static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { if (PlatformDependent.useDirectBufferNoCleaner()) { return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity); } return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity); } } public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; ByteBuffer buffer; ... //Creates a new direct buffer. //@param initialCapacity the initial capacity of the underlying direct buffer //@param maxCapacity the maximum capacity of the underlying direct buffer protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { ... this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); } //Allocate a new direct ByteBuffer with the given initialCapacity. protected ByteBuffer allocateDirect(int initialCapacity) { //使用ByteBuffer直接分配一个DirectByteBuffer对象 return ByteBuffer.allocateDirect(initialCapacity); } final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { ... this.buffer = buffer; ... } } //4.使用UnpooledDirectByteBuf创建一个非池化的直接内存ByteBuf public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private ByteBuffer buffer; ... //Creates a new direct buffer. //@param initialCapacity the initial capacity of the underlying direct buffer //@param maxCapacity the maximum capacity of the underlying direct buffer protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { ... this.alloc = alloc; //使用ByteBuffer直接分配一个DirectByteBuffer对象 setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); } private void setByteBuffer(ByteBuffer buffer) { ... this.buffer = buffer; ... } } public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> { ... //Allocates a new direct byte buffer. public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } ... } //5.unsafe和非unsafe的区别 final class UnsafeByteBufUtil { //unsafe会调用这个方法 static byte getByte(byte[] array, int index) { return PlatformDependent.getByte(array, index); } ... } final class HeapByteBufUtil { //非unsafe会调用这个方法 static byte getByte(byte[] memory, int index) { return memory[index]; } ... }