一文带你了解线程池原理
1.使用线程池的意义何在?
项目开发中,为了统一管理线程,并有效精准地进行排错,我们经常要求项目人员统一使用线程池去创建线程。因为我们是在受不了有些人动不动就去创建一个线程,使用的多了以后,一旦报错就只有一个线程报错信息,还是线程的共用信息,再加上如果你将异常吃了(捕获后不做处理)的情况下,这个错误。。。。em,我实在不知道去哪里排查,不然你换个人试试吧。
2.线程池的重要参数----你真的了解吗
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- corePoolSize:核心线程数。设置核心线程数的意义何在?通俗来讲核心线程数就是正式员工,需要长期坚守岗位,有任务就需要执行。
- maximumPoolSize:最大线程池个数。设置最大线程池数量的意义何在?其实就是一个容错机制,当你的需要执行的线程个数已经爆满并且超过的时候,提供了一个容错机制,可以保证在短期内多余的任务正常执行。相当于就是临时工,临时过来执行任务,任务结束后就可以走了。
- keepAliveTime:保活的时间。设置的意义何在?当线程任务无剧增的情况下,维持在正常提亮。你无需那么多临时工来执行任务,所以规定时间,临时工可以走人了,也即是除核心线程外的线程可以回收了。
- TimeUnit:保活的时间单位。这个就不多赘述了。
- BlockingQueue:阻塞队列。设置阻塞队列的意义何在?当所有核心线程都正在工作时,将其放入阻塞队列,等待后续执行。也就是这个任务进行排队,等正式工忙完了继续做。
- ThreadFactory:线程工厂。生产线程,由你自己去定义你想生产什么样的线程。
- RejectedExecutionHandler:拒绝策略。当你的最大线程与阻塞队列都满了。这个时候,你已经接收不了新的任务进行处理了。所以设置拒绝策略。相当于就是我所有的员工和临时工都在工作了,并且排队的任务都满了,应对这样的情况,你打算如何做。
除此之外还有一个重要的参数:
/** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程数超时退出。
该参数有在特定的业务场景下有很大的意义。比如:你的业务只在晚上需要执行,其余时间无需执行。那么为何不把资源让出来,白天的时候,可以让其他业务占有这些资源去执行呢。
3.ThreadExecutorPool线程池重要源码解析
由该类图可知,Executor执行器定义执行方法,ExecutorService定义线程池操作的基本方法,AbstractExecutorService定义了线程池操作的方法模板。
ThreadPoolExecutor任务执行流程图
1.首先是构造方法
基本的参数校验与赋值,简单代码不过多赘述。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ////基本的参数校验 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
2.线程执行的方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null);//将线程对象封装成RunnableFuture execute(ftask);//任务执行 return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task);//将线程对象封装成RunnableFuture execute(ftask);//任务执行 return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result);//将线程对象封装成RunnableFuture execute(ftask);//任务执行 return ftask; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//获取当前的线程池状态。单个参数,保存了线程池的状态以及线程数量 if (workerCountOf(c) < corePoolSize) { //当线程数量小于核心线程数 if (addWorker(command, true)) //直接添加任务,运行线程 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//如果核心线程数已经满了,那么直接添加到阻塞队列。 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//线程池不是running状态,执行拒绝策略。 reject(command); else if (workerCountOf(recheck) == 0)//线程池线程数量不能为0,需要有一个线程对线程池的后续操作进行处理,比如关闭线程池 addWorker(null, false); } else if (!addWorker(command, false))//当核心线程与阻塞队列都满了的时候,直接添加任务到非核心线程运行。添加失败直接执行拒绝策略 reject(command); }
1.关于ctl.get()方法的解释---利用了单个变量,保存了线程池状态以及线程数量的值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //运行状态 正常执行任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //关闭线程池,不再接收新任务 private static final int STOP = 1 << COUNT_BITS; //关闭线程池,所有任务停止 private static final int TIDYING = 2 << COUNT_BITS; //中间状态 private static final int TERMINATED = 3 << COUNT_BITS; //线程池已经关闭 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
2.addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get();//获取ctl的快照保存在栈上 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && //如果线程池已经关闭,或者(当前线程池关闭状态当前任务是空且当前工作队列不为空)不满足的情况下直接返回 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//CAS修改线程池ctl变量,增加线程数 break retry; //添加成功直接退出 c = ctl.get(); // 添加不成功,为了保证多线程运行的安全性,重新获取 if (runStateOf(c) != rs)//当前线程池状态发生改变 continue retry; //直接重新运行retry循环体 // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //生成自定义的线程woker final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock;//这个代码没有意义,mainLock定义的变量为final。可以直接使用 mainLock.lock();//添加work使用锁,保证添加任务的原子性。 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || //线程池处于running状态 (rs == SHUTDOWN && firstTask == null)) {//线程池处于showdown状态但是firstTask为空。 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize)//保存当前线程池中线程的最大数量 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {//添加成功,运行线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted)//线程启动失败 addWorkerFailed(w);//移除work,减少线程数量 } return workerStarted; }
t.start()执行线程任务
//Worker类中实际执行任务的方法 public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts //将原始的线程状态为-1修改为0,后续通过getState()>=0获取线程是否已经运行的状态,允许线程中断。-1默认为初始化,此处需要进行处理 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//task不等于空直接运行,task等于空从workerQueue阻塞队列获取任务 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||//线程池运行状态大于等于STOP (Thread.interrupted() && //线程是否已经被中断了 runStateAtLeast(ctl.get(), STOP))) &&//鲜橙汁运行状态大于等于STOP !wt.isInterrupted())//判断任务的线程如果没有被中断 wt.interrupt();//中断当前任务线程 try { beforeExecute(wt, task);//钩子函数,实际任务运行之前做处理 Throwable thrown = null; try { task.run();//执行实际任务代码 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//钩子函数,实际任务运行之后做处理 } } finally { task = null;//将任务置空 w.completedTasks++;//任务完成数加1 w.unlock(); } } completedAbruptly = false;//执行过程中是否发成异常 } finally { processWorkerExit(w, completedAbruptly); } }
//执行任务退出操作 private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果有异常中断导致任务结束 decrementWorkerCount();//将线程数量减1 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;//完成的任务数量累加 workers.remove(w);//从workers的任务集合中移除当前任务 } finally { mainLock.unlock(); } tryTerminate();//尝试关闭线程池 int c = ctl.get();//获取当前线程池的最新状态 if (runStateLessThan(c, STOP)) {//如果当前任务状态小于STOP if (!completedAbruptly) {//当前任务执行无异常发生 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//根据allowCoreThreadTimeOut参数获取最小的线程数量 if (min == 0 && ! workQueue.isEmpty())//如果核心线程允许退出,并且工作队列不为空 min = 1;//设置最小值为1,因为最后需要有线程去执行线程池的后续处理,所有线程都没了,后续线程池退出无线程处理 if (workerCountOf(c) >= min)//如果工作的线程数量大于等最小值 return; // replacement not needed 直接返回 } addWorker(null, false);//如果当前线程数已经小于最小线程数,那么需要保证最小线程数在运行,所以需要有保证线程池的正常运行,添加一个空任务。 } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get();//获取当前线程池状态 int rs = runStateOf(c);//获取当前运行状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//如果线程池状态大于等于SHUTDOWN并且(线程数量大于等于STOP或者工作队列为空) decrementWorkerCount();//将线程池中线程数量减1 return null; } int wc = workerCountOf(c);//获取当前线程池的线程数量 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//判断是否运行核心线程数超时,判断是否需要超时机制 if ((wc > maximumPoolSize || (timed && timedOut))//工作线程大于最大线程池数量或者允许超时并且有超时的情况 && (wc > 1 || workQueue.isEmpty())) {//并且线程池线程数量大于1或者阻塞队列为空 if (compareAndDecrementWorkerCount(c))//CAS操作将线程池数量减1 return null;//返回空 continue;//CAS失败继续 } try { Runnable r = timed ?//允许超时从队列中拿任务并等待keepAliveTime时间 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();阻塞等待 if (r != null)//获取的任务不为空 return r;//直接返回 timedOut = true;//如果为空,超时标志位为true } catch (InterruptedException retry) { timedOut = false; } } }
3.addWorkerFailed方法解析
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取锁 try { if (w != null)//work不是空 workers.remove(w);//直接从workers中移除当前任务 decrementWorkerCount();//加个ctl中的woker数量减少 tryTerminate();//如果线程池已经是showdown状态,尝试让线程池停止。多线程协作的函数 } finally { mainLock.unlock(); } }
3.线程池关闭shutdown方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//检查关闭权限,可以忽略 advanceRunState(SHUTDOWN);//线程池状态递进,由running变为shutdown interruptIdleWorkers();//中断所有空闲线程 onShutdown(); // hook for ScheduledThreadPoolExecutor钩子函数,调度线程池使用 } finally { mainLock.unlock(); } tryTerminate();//尝试将线程池关闭。 }
1.advanceRunState方法解析
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get();//获取当前的线程状态 if (runStateAtLeast(c, targetState) ||//当前状态已经是大于等于shutdown直接退出 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//cas操作将线程状态改为targetState。 break; } }
2.interruptIdleWorkers方法解析
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取锁 try { for (Worker w : workers) {//遍历works中所有的工作任务 Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//如果没有被中断过,并且可以获得锁,证明属于空闲线程 try { t.interrupt();//将线程中断,打上中断标志位 } catch (SecurityException ignore) { } finally { w.unlock();//解锁 } } if (onlyOne)//只中断一个线程标识 break; } } finally { mainLock.unlock(); } }
4.shutdownNow方法解析
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//权限检查 advanceRunState(STOP);//状态递进 详细方法见上面 interruptWorkers();//中断所有启动的work线程 tasks = drainQueue();//将所有未执行的任务出队保存 } finally { mainLock.unlock(); } tryTerminate();//尝试关闭线程池 return tasks; }
1.interruptWorkers方法解析
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取锁 try { for (Worker w : workers)//遍历所有woker进行处理 w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//当前work的状态大于0并且线程不为空且线程未被中断 try { t.interrupt(); } catch (SecurityException ignore) { } } } 使用getState() >= 0表示当前线程已经启动,runWorker方法中会将其状态从-1改变。证明线程已经启动 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
2.drainQueue方法解析
//标准的入队和出队功能不做过多注释 private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
5.tryTerminate方法解析
final void tryTerminate() { for (;;) { int c = ctl.get();//获取当前线程状态ctl if (isRunning(c) ||//线程池正在运行 runStateAtLeast(c, TIDYING) ||//线程池状态大于等于TIDYING,有其他线程已经改变线程池状态为TIDYING或者TERMINATED了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//线程池状态等于shutdown并且工作队列不为空。 return;//以上三种情况线程池无法关闭,需要继续处理 if (workerCountOf(c) != 0) { // Eligible to terminate//当前工作线程数量不等于0 interruptIdleWorkers(ONLY_ONE);//中断线程且只中断一个 return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//cas操作将线程池状态置为TIDYING try { terminated();//线程池终止 } finally { ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED termination.signalAll();//信号唤醒所有等待线程 } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
4.总结
线程池的运用在项目中已经成为一种常态,作为一个开发人员最重要的了解其背后的设计原理以及流程,更好地运用线程池,方便提升项目程序的性能以及排查错误。在阅读对应的线程池源码时,我们只局限于单线程的思维,更多的是要去考虑当多线程并发执行时的临界条件。了解设计者的设计初衷、以及设计意图,能让你更好地在项目中运用并设计符合自己项目的线程池。以上是我个人对于线程池ThreadPoolExecutor的理解,不足之处,请多多指教。