大纲
1.Netty的两大性能优化工具
2.FastThreadLocal的实现之构造方法
3.FastThreadLocal的实现之get()方法
4.FastThreadLocal的实现之set()方法
5.FastThreadLocal的总结
6.Recycler的设计理念
7.Recycler的使用
8.Recycler的四个核心组件
9.Recycler的初始化
10.Recycler的对象获取
11.Recycler的对象回收
12.异线程收割对象
13.Recycler的总结
14.Netty设计模式之单例模式
15.Netty设计模式之策略模式
16.Netty设计模式之装饰器模式
17.Netty设计模式之观察者模式
18.Netty设计模式之迭代器模式
19.Netty设计模式之责任链模式
1.Netty的两大性能优化工具
(1)FastThreadLocal
(2)Recycler
(1)FastThreadLocal
FastThreadLocal的作用与ThreadLocal相当,但比ThreadLocal更快。ThreadLocal的作用是多线程访问同一变量时能够通过线程本地化的方式避免多线程竞争、实现线程隔离。
Netty的FastThreadLocal重新实现了JDK的ThreadLocal的功能,且访问速度更快,但前提是使用FastThreadLocalThread线程。
(2)Recycler
Recycler实现了一个轻量级的对象池机制。对象池的作用是一方面可以实现快速创建对象,另一方面可以避免反复创建对象、减少YGC压力。
Netty使用Recycler的方式来获取ByteBuf对象的原因是:ByteBuf对象的创建在Netty里是非常频繁的且又比较占空间。但是如果对象比较小,使用对象池也不是那么划算。
2.FastThreadLocal的实现之构造方法
Netty为FastThreadLocal量身打造了FastThreadLocalThread和InternalThreadLocalMap两个重要的类,FastThreadLocal的构造方法会设置一个由final修饰的int型变量index,该index变量的值会由InternalThreadLocalMap的静态方法通过原子类来进行获取。
//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason. //Note that the fast path is only possible on threads that extend FastThreadLocalThread, //because it requires a special field to store the necessary state. //An access by any other kind of thread falls back to a regular ThreadLocal. //@param <V> the type of the thread-local variable public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置存所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } ... } //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { public static final Object UNSET = new Object(); public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException("too many thread-local indexed variables"); } return index; } ... } //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); //Used by FastThreadLocal Object[] indexedVariables; ... }
3.FastThreadLocal的实现之get()方法
(1)FastThreadLocalThread的关键属性
(2)Thread与ThreadLocalMap
(3)FastThreadLocal.get()方法的实现流程
(4)从InternalThreadLocalMap中获取值
(1)FastThreadLocalThread的关键属性
FastThreadLocal继承了Thread类,每个FastThreadLocalThread线程对应一个InternalThreadLocalMap实例。只有FastThreadLocal和FastThreadLocalThread线程组合在一起使用的时候才能发挥出FastThreadLocal的性能优势。
public class FastThreadLocalThread extends Thread { private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread() { } public FastThreadLocalThread(Runnable target) { super(target); } public FastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); } public FastThreadLocalThread(String name) { super(name); } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); } public FastThreadLocalThread(Runnable target, String name) { super(target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); } //Returns the internal data structure that keeps the thread-local variables bound to this thread. //Note that this method is for internal use only, and thus is subject to change at any time. public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } //Sets the internal data structure that keeps the thread-local variables bound to this thread. //Note that this method is for internal use only, and thus is subject to change at any time. public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { //这个方法会在调用InternalThreadLocalMap.get()方法时被调用 //具体就是通过fastGet()方法设置FastThreadLocalThread一个新创建的InternalThreadLocalMap对象 this.threadLocalMap = threadLocalMap; } }
(2)Thread与ThreadLocalMap
注意:每个Thread线程对应一个ThreadLocalMap实例。ThreadLocal.ThreadLocalMap是采用线性探测法来解决哈希冲突的。
public class Thread implements Runnable { ... //ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class. ThreadLocal.ThreadLocalMap threadLocals = null; ... } public class ThreadLocal<T> { ... //Returns the value in the current thread's copy of this thread-local variable. //If the variable has no value for the current thread, //it is first initialized to the value returned by an invocation of the #initialValue method. //@return the current thread's value of this thread-local public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } //ThreadLocalMap is a customized hash map suitable only for maintaining thread local values. //No operations are exported outside of the ThreadLocal class. //The class is package private to allow declaration of fields in class Thread. //To help deal with very large and long-lived usages, the hash table entries use WeakReferences for keys. //However, since reference queues are not used, //stale entries are guaranteed to be removed only when the table starts running out of space. static class ThreadLocalMap { //The entries in this hash map extend WeakReference, using its main ref field as the key (which is always a ThreadLocal object). //Note that null keys (i.e. entry.get() == null) mean that the key is no longer referenced, so the entry can be expunged from table. //Such entries are referred to as "stale entries" in the code that follows. static class Entry extends WeakReference<ThreadLocal<?>> { //The value associated with this ThreadLocal. Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } //The initial capacity -- MUST be a power of two. private static final int INITIAL_CAPACITY = 16; //The table, resized as necessary. //table.length MUST always be a power of two. private Entry[] table; //The number of entries in the table. private int size = 0; //The next size value at which to resize. private int threshold; // Default to 0 ... } ... }
(3)FastThreadLocal.get()方法的实现流程
首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap。然后通过唯一标识当前FastThreadLocal对象的索引index,从InternalThreadLocalMap中取出数组元素值。如果取出的值是缺省的,则对该数组元素进行初始化并将当前FastThreadLocal对象保存到待清理的Set中。
//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason. //Note that the fast path is only possible on threads that extend FastThreadLocalThread, //because it requires a special field to store the necessary state. //An access by any other kind of thread falls back to a regular ThreadLocal. //@param <V> the type of the thread-local variable public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置 private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public FastThreadLocal() { //每new一个FastThreadLocal,index就会自增1,所以index是FastThreadLocal的唯一身份ID index = InternalThreadLocalMap.nextVariableIndex(); } //Returns the current value for the current thread @SuppressWarnings("unchecked") public final V get() { //首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //从数组中取出index位置的元素 Object v = threadLocalMap.indexedVariable(index); //如果获取到的元素不是一个UNSET即一个new Object(),则返回该元素 if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //如果获取到的数组元素是缺省对象,则对threadLocalMap在index位置的元素值执行初始化操作 return initialize(threadLocalMap); } private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { //通过initialValue()方法对threadLocalMap在index位置的元素值进行初始化 //initialValue()方法可以被FastThreadLocal<V>的子类重写 v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } //设置threadLocalMap数组在下标index处的元素值 threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; } //Returns the initial value for this thread-local variable. protected V initialValue() throws Exception { return null; } private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { //获取threadLocalMap数组下标为0的元素 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除 if (v == InternalThreadLocalMap.UNSET || v == null) { //创建FastThreadLocal类型的Set集合 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); //将variablesToRemove这个Set集合设置到数组下标为0的位置 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { //强转获得Set集合 variablesToRemove = (Set<FastThreadLocal<?>>) v; } variablesToRemove.add(variable); } ... }
(4)从InternalThreadLocalMap中获取值
InternalThreadLocalMap.get()方法会分别通过fastGet()和slowGet()来获取一个InternalThreadLocalMap对象。
如果当前线程是普通线程则调用slowGet()方法,让每个线程通过JDK的ThreadLocal来拿到一个InternalThreadLocalMap对象,所以如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的。
如果当前线程是FastThreadLocalThread线程则调用fastGet()方法,由于FastThreadLocalThread线程维护了一个InternalThreadLocalMap对象,所以fastGet()方法相当于直接从线程对象里把这个InternalThreadLocalMap拿出来。
注意:Reactor线程所创建的线程实体便是FastThreadLocalThread线程。
//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { public static final Object UNSET = new Object(); private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32; ... private InternalThreadLocalMap() { //设置父类的成员变量indexedVariables的初始值 super(newIndexedVariableTable()); } private static Object[] newIndexedVariableTable() { //初始化一个32个元素的数组 Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE]; //每个元素都是UNSET值 Arrays.fill(array, UNSET); return array; } //index是当前访问的FastThreadLocal在JVM里的索引 //indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的 public Object indexedVariable(int index) { Object[] lookup = indexedVariables; //直接通过索引来取出对象 return index < lookup.length? lookup[index] : UNSET; } public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; } private static InternalThreadLocalMap slowGet() { //如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的 //因为此时返回的是一个通过ThreadLocal维护的InternalThreadLocalMap对象 ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; } ... } class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); //Used by FastThreadLocal Object[] indexedVariables; ... UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { this.indexedVariables = indexedVariables; } ... }
InternalThreadLocalMap的indexedVariable()方法中的入参index索引,指的是每一个FastThreadLocal对象在JVM里的标识ID,通过自增的方式由原子类进行创建。
对于当前线程维护的InternalThreadLocalMap对象里的数组indexedVariables,可以通过下标方式indexedVariables[index]获取一个Object。
初始化InternalThreadLocalMap对象时,会初始化一个32个元素的indexedVariables数组,每一个元素都是UNSET值。
4.FastThreadLocal的实现之set()方法
(1)FastThreadLocal.set()方法的实现流程
(2)向InternalThreadLocalMap设置值
(3)InternalThreadLocalMap的扩容原理
(1)FastThreadLocal.set()方法的实现流程
首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap。然后通过唯一标识当前FastThreadLocal对象的索引index,给InternalThreadLocalMap数组中对应的位置设置值。接着将当前FastThreadLocal对象保存到待清理的Set中,如果设置的值是一个缺省的Object对象,则通过remove()方法删除InternalThreadLocalMap数组中对应位置的元素。
public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置 private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); ... //Set the value for the current thread. public final void set(V value) { if (value != InternalThreadLocalMap.UNSET) { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); setKnownNotUnset(threadLocalMap, value); } else { remove(); } } private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { //将当前FastThreadLocal对象对应的数据添加到当前线程维护的InternalThreadLocalMap中 if (threadLocalMap.setIndexedVariable(index, value)) { //将当前FastThreadLocal对象保存到待清理的Set中 addToVariablesToRemove(threadLocalMap, this); } } private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { //获取threadLocalMap数组下标为0的元素 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除 if (v == InternalThreadLocalMap.UNSET || v == null) { //创建FastThreadLocal类型的Set集合 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); //将variablesToRemove这个Set集合设置到数组下标为0的位置 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { //强转获得Set集合 variablesToRemove = (Set<FastThreadLocal<?>>) v; } variablesToRemove.add(variable); } //Sets the value to uninitialized; //a proceeding call to get() will trigger a call to initialValue(). public final void remove() { remove(InternalThreadLocalMap.getIfSet()); } //Sets the value to uninitialized for the specified thread local map; //a proceeding call to get() will trigger a call to initialValue(). //The specified thread local map must be for the current thread. @SuppressWarnings("unchecked") public final void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap == null) { return; } //删除数组下标index位置对应的value Object v = threadLocalMap.removeIndexedVariable(index); //从数组下标0的位置取出Set集合,删除当前FastThreadLocal对象 removeFromVariablesToRemove(threadLocalMap, this); if (v != InternalThreadLocalMap.UNSET) { try { //和initValue()方法一样,可以被FastThreadLocal的子类重写 onRemoval((V) v); } catch (Exception e) { PlatformDependent.throwException(e); } } } //Returns the initial value for this thread-local variable. protected V initialValue() throws Exception { return null; } //Invoked when this thread local variable is removed by #remove(). //Be aware that #remove() is not guaranteed to be called when the `Thread` completes which means //you can not depend on this for cleanup of the resources in the case of `Thread` completion. protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { } ... }
FastThreadLocal的addToVariablesToRemove()方法解释了:为什么InternalThreadLocalMap的value数据是从下标1的位置开始进行存储的,因为下标0的位置存储的是一个Set集合,集合里面的元素是FastThreadLocal对象。
(2)向InternalThreadLocalMap设置值
//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { public static final Object UNSET = new Object(); ... //index是当前访问的FastThreadLocal在JVM里的索引 //indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的 public Object indexedVariable(int index) { Object[] lookup = indexedVariables; //直接通过索引来取出对象 return index < lookup.length? lookup[index] : UNSET; } //如果设置的是新值,则返回true;如果设置的是旧值,则返回false; public boolean setIndexedVariable(int index, Object value) { Object[] lookup = indexedVariables; if (index < lookup.length) { Object oldValue = lookup[index]; //直接将数组index位置的元素设置为value,时间复杂度为O(1) lookup[index] = value; return oldValue == UNSET; } else { //扩容数组 expandIndexedVariableTableAndSet(index, value); return true; } } //通过无符号右移和位或运算实现2^n * 2,这与HashMap的扩容原理是一样的 private void expandIndexedVariableTableAndSet(int index, Object value) { Object[] oldArray = indexedVariables; final int oldCapacity = oldArray.length; int newCapacity = index;//假设index = 16,也就是1000 newCapacity |= newCapacity >>> 1;//变为1100 newCapacity |= newCapacity >>> 2;//变为1111 newCapacity |= newCapacity >>> 4;//还是1111 newCapacity |= newCapacity >>> 8;//还是1111 newCapacity |= newCapacity >>> 16;//还是1111 newCapacity ++;//变为10000 Object[] newArray = Arrays.copyOf(oldArray, newCapacity); Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); newArray[index] = value; indexedVariables = newArray; } //根据不同的Thread返回InternalThreadLocalMap public static InternalThreadLocalMap getIfSet() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return ((FastThreadLocalThread) thread).threadLocalMap(); } return slowThreadLocalMap.get(); } ... }
(3)InternalThreadLocalMap的扩容原理
InternalThreadLocalMap的数组扩容原理和JDK的HashMap扩容原理是一样的。注意:InternalThreadLocalMap是以index为基准进行扩容而不是以原数组长度为基准进行扩容。
假设初始化了70个FastThreadLocal,但这些FastThreadLocal从来没有调用过set()方法,此时InternalThreadLocalMap数组的长度还是默认的32。当index = 70的FastThreadLocal调用set()方法时,就不能以32为基准进行两倍扩容了。
5.FastThreadLocal的总结
(1)FastThreadLocal不一定比ThreadLocal快
(2)FastThreadLocal并不会浪费很大的空间
(3)FastThreadLocal如何实现高效查找
(4)FastThreadLocal具有更高的安全性
(1)FastThreadLocal不一定比ThreadLocal快
只有FastThreadLocalThread线程使用FastThreadLocal才会更快,如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的。
(2)FastThreadLocal并不会浪费很大的空间
虽然FastThreadLocal采用了空间换时间的思路,但其设计之初就认为不会存在特别多的FastThreadLocal对象,而且在数据中没有使用的元素只是存放了同一个缺省对象的引用(UNSET),所以并不会占用太多内存空间。
(3)FastThreadLocal如何实现高效查找
一.FastThreadLocal
在定位数据时可以直接根据数组下标index进行定位,时间复杂度为O(1),所以在数据较多时也不会存在Hash冲突。在进行数组扩容时只需要把数组容量扩容2倍,然后再把原数据拷贝到新数组。
二.ThreadLocal
在定位数据时是根据哈希算法进行定位的,在数据较多时容易发生Hash冲突。发生Hash冲突时采用线性探测法解决Hash冲突要不断向下寻找,效率较低。在进行数组扩容时由于采用了哈希算法,所以在数组扩容后需要再做一轮rehash。
(4)FastThreadLocal具有更高的安全性
FastThreadLocal不仅提供了remove()方法可以主动清除对象,而且在线程池场景中(也就是SingleThreadEventExecutor和DefaultThreadFactory),Netty还封装了FastThreadLocalRunnable。
FastThreadLocalRunnable最后会执行FastThreadLocal的removeAll()方法将Set集合中所有的FastThreadLocal对象都清理掉。
ThreadLocal使用不当可能会造成内存泄露,只能等待线程销毁。或者只能通过主动检测的方式防止内存泄露,从而增加了额外的开销。
6.Recycler的设计理念
(1)对象池和内存池都是为了提高并发处理能力
(2)Recycler是Netty实现的轻量级对象池
(1)对象池和内存池都是为了提高并发处理能力
Java中频繁创建和销毁对象的开销是很大的,所以通常会将一些对象缓存起来。当需要某个对象时,优先从对象池中获取对象实例。通过重用对象不仅避免了频繁创建和销毁对象带来的性能损耗,而且对于JVM GC也是友好的,这就是对象池的作用。
(2)Recycler是Netty提供实现的轻量级对象池
通过Recycler,在创建对象时,就不需要每次都通过new的方式去创建了。如果Recycler里面有已经用过的对象,则可以直接把这个对象取出来进行二次利用。在不需要该对象时,只需要把它放到Recycler里以备下次需要时取出。
7.Recycler的使用
定义一个对象池实例RECYCLER时,先实现newObject()方法。如果对象池里没有可用的对象,就会调用newObject()方法来新建对象。此外需要创建Recycler.Handle对象与T对象进行绑定,这样才可以通过RECYCLER.get()方法从对象池中获取对象。如果对象不再需要使用,则通过调用T类实现的recycle()方法将对象回收到对象池。
public class RecycleTest { private static final Recycler<User> RECYCLER = new Recycler<User>() { protected User newObject(Handle<User> handle) { return new User(handle); } }; private static class User { //创建Recycler.Handle<User>对象与User对象进行绑定 private final Recycler.Handle<User> handle; public User(Recycler.Handle<User> handle) { this.handle = handle; } public void recycle() { handle.recycle(this); } } public static void main(String[] args) { //1.从对象池中获取User对象 User user1 = RECYCLER.get(); //2.回收对象到对象池 user1.recycle(); //3.从对象池中获取对象 User user2 = RECYCLER.get(); System.out.println(user1 == user2); } }
8.Recycler的四个核心组件
(1)Recycler的初始化和get()方法
(2)第一个核心组件是Stack
(3)第二个核心组件是WeakOrderQueue
(4)第三个核心组件是Link
(5)第四个核心组件是DefaultHandle
(1)Recycler的初始化和get()方法
public abstract class Recycler<T> { ... private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;//Use 4k instances as default. private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int MAX_DELAYED_QUEUES_PER_THREAD; private static final int LINK_CAPACITY; private static final int RATIO; static { int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD)); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD; } DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread; MAX_SHARED_CAPACITY_FACTOR = max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); MAX_DELAYED_QUEUES_PER_THREAD = max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", NettyRuntime.availableProcessors() * 2)); LINK_CAPACITY = safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8)); ... } private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); } }; private final int maxCapacityPerThread; private final int maxSharedCapacityFactor; private final int interval; private final int maxDelayedQueuesPerThread; private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { //Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } }; protected Recycler() { this(DEFAULT_MAX_CAPACITY_PER_THREAD); } protected Recycler(int maxCapacityPerThread) { this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) { this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { interval = safeFindNextPositivePowerOfTwo(ratio); if (maxCapacityPerThread <= 0) { this.maxCapacityPerThread = 0; this.maxSharedCapacityFactor = 1; this.maxDelayedQueuesPerThread = 0; } else { this.maxCapacityPerThread = maxCapacityPerThread; this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread); } } @SuppressWarnings("unchecked") public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { //创建一个DefaultHandle handle = stack.newHandle(); //创建一个对象,并绑定这个DefaultHandle handle.value = newObject(handle); } return (T) handle.value; } ... }
(2)第一个核心组件是Stack
Stack是整个对象池的顶层数据结构。Stack描述了整个对象池的构造,用于存储当前同线程回收的对象,这里同线程的意思就是和后续的异线程相对的。在多线程的场景下,Netty为了避免锁竞争问题,每个线程都会持有各自的Stack。Recycler会通过FastThreadLocal实现每个线程对Stack的私有化。
public abstract class Recycler<T> { ... private static final class Stack<T> { //所属的Recycler final Recycler<T> parent; //所属线程的弱引用 final WeakReference<Thread> threadRef; //异线程回收对象时,其他线程能保存的被回收对象的最大个数 final AtomicInteger availableSharedCapacity; //WeakOrderQueue最大个数 private final int maxDelayedQueues; //对象池的最大大小,默认最大为4K private final int maxCapacity; //存储缓存数据的数组 DefaultHandle<?>[] elements; //缓存的DefaultHandle对象个数 int size; //WeakOrderQueue链表的三个重要结点 private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) { this.parent = parent; threadRef = new WeakReference<Thread>(thread); this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; this.interval = interval; handleRecycleCount = interval;//Start at interval so the first one will be recycled. this.maxDelayedQueues = maxDelayedQueues; } ... } ... }
Stack的结构如下图示,其中Thread A是同线程,Thread B、Thread C、Thread D是异线程。
