【Java线程池】 java.util.concurrent.ThreadPoolExecutor 源码分析

线程池概述

线程池,是指管理一组同构工作线程的资源池。
线程池在工作队列(Work Queue)中保存了所有等待执行的任务。工作者线程(Work Thread)会从工作队列中获取一个任务并执行,然后返回线程池并等待下一个任务。

线程池比执行任务再创建线程会有以下优势:

  1. 节省资源。通过重用线程来省去在线程创建和销毁过程中产生的开销。
  2. 提高响应性。当执行请求到达时,工作线程通常已经存在,因此不需要等待线程创建,从而提高了响应性。
  3. 防止资源耗尽。通过调整线程池的大小,防止过多线程相互竞争资源。

ThreadPoolExecutor 是Java中线程池的实现类。下图是继承关系:

classDiagram direction BT class AbstractExecutorService class Executor { <<Interface>> } class ExecutorService { <<Interface>> } class ThreadPoolExecutor AbstractExecutorService ..> ExecutorService ExecutorService --> Executor ThreadPoolExecutor --> AbstractExecutorService
  • Executor ,基于生产者-消费者模式,提交相当于生产者,执行任务相当于消费者。通过该模式将任务的提交与执行解耦开来。
  • ExecutorService,是Executor 的实现,该接口基于Executor 提供了生命周期的支持,例如任务的运行、关闭和中止,以及通过 submit 方法来创建一个异步任务 Future
  • AbstractExecutorService,是ExecutorService 的实现,使得下层实现只需要关注任务的执行。
  • ThreadPoolExecutor,是Java中线程池的实现。

下图是ThreadPoolExecutor的大致结构

【Java线程池】 java.util.concurrent.ThreadPoolExecutor 源码分析

参考 Java线程池实现原理及其在美团业务中的实践

生命周期

Executor 的实现通常会创建线程来执行任务。由于 Executor 以异步的方式来执行任务,因此之前提交任务的状态不是立即可见的,有些任务已经完成,有些还在运行,有些在等待执行。当关闭应用程序时,是完成当前任务并不接受新任务,还是直接关闭所有任务(不管是在执行还是没有执行)。
为了解决执行服务的生命周期问题,Executor 扩展了 ExecutorService 接口,该接口提供了对生命周期管理的方法。

public interface ExecutorService extends Executor { 	void shutdown(); 	List<Runnable> shutdownNow(); 	boolean awaitTermination(long timeout, TimeUnit unit); 	boolean isShutdown(); 	boolean isTerminated(); 	 	// 以及一些用于创建异步任务的方法 	<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); 	Future<?> submit(Runnable task); 	....	 } 

在ThreadPoolExecutor中提供了五种状态:

状态 描述 转换
RUNTIME 接受新任务并处理队列任务
SHUTDOWN 不接受新任务,但是处理队列中任务 RUNTIME->SHUTDOWN:执行 shutdown()
STOP 不接受新任务,不处理队列中任务,并中断正在运行中的任务 RUNTIME->STOP:执行 shutdownNow()
TIDYING 所有任务已经终止,工作者线程为0,之后会执行一个钩子函数(TERMINATED())用于清理 SHUTDOWN->TIDYING:队列和线程池都为空
STOP->TIDYING: 线程池为空
TERMINATED 钩子函数执行完毕 TIDYING->TERMINATED

下图是状态之间的转换关系:
【Java线程池】 java.util.concurrent.ThreadPoolExecutor 源码分析

如果任务队列被填满(在队列大小有限的情况下)或者某个任务被提交到一个已经被关闭的Executor中时应该怎么处理这些情况?JDK提供了一种策略来处理这些情况--饱和策略

ThreadPoolExecutor中通过ctl字段来维护了线程池的运行状态和线程数量(工作者线程)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 

具体的可以通过两个参数来说明:

  • COUNT_BIT的值为29(32-3)
  • COUNT_MASK 则为高三位为0、低29位全1的字段
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; 

每个状态的取值如下,每个取值都向左边移动了29位:

private static final int RUNNING    = -1 << COUNT_BITS;   private static final int SHUTDOWN   =  0 << COUNT_BITS;   private static final int STOP       =  1 << COUNT_BITS;   private static final int TIDYING    =  2 << COUNT_BITS;   private static final int TERMINATED =  3 << COUNT_BITS; 

所以高三位用来表示线程池的状态
之后用来确定线程池中的线程数量都使用COUNT_MASK来计算,这样就能计算低29位

private static int workerCountOf(int c)  { return c & COUNT_MASK; } 

ThreadPoolExecutor 通过如下的方法来检测当前线程池的状态:
运行是要比终结状态小的

// 当前状态要比给定的状态小,如 running < terminated private static boolean runStateLessThan(int c, int s) {       return c < s;   }   // 当前状态要高于给定的状态,如 terminated >= running private static boolean runStateAtLeast(int c, int s) {       return c >= s;   }   // 检测当前线程是否在运行 private static boolean isRunning(int c) {       return c < SHUTDOWN;   } 

饱和(拒绝)策略

ThreadPoolExecutor的饱和策略可以通过调用 setRejectedtExecutionHandler 来修改。
JDK 提供了几种不同的 RejectedExecutionHandler 的实现:

名称 描述
AbortPolicy 默认的饱和策略,会抛出一个 RejectedExecutionException
可以捕获这个异常然后进行处理
CallerRunsPolicy 该策略实现了一种调节机制,不会抛弃任务也不会抛出异常。
而是将该任务在调用者线程中运行,使得调用者需要执行完该任务才能继续提交任务。
这样使得工作者线程有时间来处理正在执行的任务。
DiscardPolicy 会抛弃任务,不会提醒任务被抛出
DiscardOldestPolicy 抛弃下一个将被执行的任务并将该任务重新提交

队列任务管理

如果请求速率超过了线程池的处理速率,那么新到来的请求将会累计起来,这些请求会在一个由 Executor 管理的 Runnable 队列(也就是任务队列 Work Queue)中等待。但这仍有可能超出缓存的数量。
基本的任务排队方法有三种:

  • 有界队列:有长度限制
  • 无界队列:没有长度限制
  • 同步移交:立即将元素传输给正在等待的消费者
    BlockingQueueThreadPoolExecutor 的任务队列:
public interface BlockingQueue<E> extends Queue<E> { 	offer(); 	pool(); 	put(); 	take(); 	.... } 

JDK 提供了以下几种实现类:

类名 描述 类型
LinkedBlockingQueue 由链表组成,此队列按照FIFO对元素进行排序。默认长度为 Integer.MAX_VALUE 有界阻塞队列
PriorityBlockingQueue 优先队列,默认按从小到大(自然序)进行排序。不能保证同优先级元素的顺序 无界阻塞队列
ArrayBlockingQueue 由数组组成,按照FIFO对元素进行排序。支持公平锁和非公平锁,默认使用非公平锁 有界阻塞队列
SynchronousQueue 不是真正的队列,是一种特殊的阻塞队列,没有实际的容量,任意线程都会等待直到获得数据或者交付完成才会返回 无界阻塞同步移交
DelayQueue 实现了PriorityBlockingQueue的延迟的无界队列,指定多久能从队列中获取元素。 无界队列
LinkedTransferQueue 多实现了一个 TransferQueue 接口,支持将元素移交给正在等待的消费者 无界队列
LinkedBlockingDeque 由链表组成的双向阻塞队列,得益于双向队列的特性,多线程并发时,可以将锁的竞争最多降到一半 阻塞队列

工作者线程

所有的任务都是由工作者线程来执行的,那么工作者线程是如何执行这些任务的,以及线程池是如何维护工作者线程的。下面是ThreadPoolExecutor中的工作者线程类Worker

private final class Worker       extends AbstractQueuedSynchronizer       implements Runnable   { 	// 线程 	final Thread thread; 	// 第一个任务 	Runnable firstTask; 	// 执行任务的数量 	volatile long completedTasks; 	.... } 

工作者线程实现了Runnable接口,并且它包含了一个Thread ,说明工作者线程是一个特殊的任务也是一个线程,它可以去执行一些其他任务也可以自控制。

工作者线程继承自AQS,而不是使用的可重入锁ReentrantLock,目的是实现可不重入的特性(线程封闭)。每个工作者线程必须是线程封闭的。
在工作者线程构造器中有一段:

this.thread = getThreadFactory().newThread(this); 

通过调用线程工厂创建线程并将this(也就是工作者线程)提交给Thread
也就是执行Thread就启动了自己,所以工作者线程可以自己管理自己
addWorkder中创建工作者线程之后执行了它,这里的t就是thread

t.start();   

线程池中的工作者线程的状态由两个字段来控制:

// 生存时间 private volatile long keepAliveTime; // 是否允许核心线程超时等待 private volatile boolean allowCoreThreadTimeOut; 

keepAliveTime 是当线程数量大于核心线程数数量时工作者线程没有任务时存活的时间
例如当前工作者线程数量是30,核心线程数量上限是20,最大线程数量是30。那么多出来10个线程在线程池比较空闲的时候是需要清除的,因为这是占用了多余的系统资源。
keepAliveTime是为了保证突然之间线程池繁忙的情况,这时候就没必要立马清除这些线程,可以"等等看"有没有突发情况。

allowCoreThreadTimeOut 则是使得核心线程也受keepAliveTime的影响
这些具体体现在从队列中获取任务的时候,下面会详细描述

线程工厂

每当线程池创建一个线程,都是通过线程的工厂方法创建的。
默认的线程工厂方法创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过制定一个工厂方法,可以定制线程池的配置信息。每当线程池需要一个新的线程都会调用getThreadFactory()这个方法。

以下是ThreadPoolExecutor 创建一个工作者线程,是通过工厂方法创建的:

Worker(Runnable firstTask) {       setState(-1); // inhibit interrupts until runWorker       this.firstTask = firstTask;       this.thread = getThreadFactory().newThread(this);   } 

ThreadFactory 是一个接口

public interface ThreadFactory {     Thread newThread(Runnable r);   } 

在构造线程池ThreadPoolExecutor时,可以传入一个线程工厂,使得创建线程时通过该线程工厂创建。

执行一个任务

ThreadPoolExecutor 是通过实现Executor来执行任务的
具体分为三个步骤:

  1. 如果当前正在运行的线程小于corePoolSize则创建一个工作者线程并将该任务作为该线程的第一个任务执行
  2. 如果当前任务能够进入任务队列,仍需要检查线程池的运行状态。如果线程池关闭则需要将任务交给饱和策略处理。如果没有关闭并且工作者线程为0,则需要创建工作者线程(这个时候任务已经在队列中)。
  3. 如果不能够进入队列,则尝试创建工作者线程去处理任务;如果失败则说明已经饱和,则将任务交给饱和(拒绝)策略处理。

【Java线程池】 java.util.concurrent.ThreadPoolExecutor 源码分析

copy from Java线程池实现原理及其在美团业务中的实践

通过源码来理解一下:

public void execute(Runnable command) {   	// 任务不能为空     if (command == null)           throw new NullPointerException(); 	// 任务数量 	int c = ctl.get();       if (workerCountOf(c) < corePoolSize) {           if (addWorker(command, true))               return;           c = ctl.get();       }       if (isRunning(c) && workQueue.offer(command)) {           int recheck = ctl.get();           if (! isRunning(recheck) && remove(command))               reject(command);           else if (workerCountOf(recheck) == 0)               addWorker(null, false);       }       else if (!addWorker(command, false))           reject(command);   } 

当执行一个任务会将一个任务放到工作队列中或者是直接创建一个工作者线程去执行该任务。
addWorker 是创建一个工作者线程并运行一个任务的(可以不运行一个任务)
这段代码主要做两个工作:

  1. 保证能正常添加工作者线程,数量不能超过设定的范围并且线程池没有关闭。
  2. 向工作者线程组中添加一个工作者线程并且线程池在正常运行,之后运行该线程去执行任务。
private boolean addWorker(Runnable firstTask, boolean core) {   	// 保证能添加工作者线程     retry:       for (int c = ctl.get();;) {           // Check if queue empty only if necessary.           if (runStateAtLeast(c, SHUTDOWN)               && (runStateAtLeast(c, STOP)                   || firstTask != null                   || workQueue.isEmpty()))               return false;              for (;;) { 	        // 如果超过限定数量,这个数量可以是最小的活跃线程数量可以是最大的活跃线程数量               if (workerCountOf(c)                   >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))                 return false;               if (compareAndIncrementWorkerCount(c))   	            // 只有能增加工作者线程才退出                 break retry;               //重新获取并检查运行状态             c = ctl.get();  // Re-read ctl               if (runStateAtLeast(c, SHUTDOWN))                   continue retry;               // else CAS failed due to workerCount change; retry inner loop           }       }   	// 从这里创建一个工作者线程并运行该线程     boolean workerStarted = false;       boolean workerAdded = false;       Worker w = null;       try {           w = new Worker(firstTask);           final Thread t = w.thread;           if (t != null) {               final ReentrantLock mainLock = this.mainLock;               mainLock.lock();               try {   	            // 检测线程池运行状态                 if (isRunning(c) ||                       (runStateLessThan(c, STOP) && firstTask == null)) {                     // 检测线程状态,线程状态必须为 NEW                     if (t.getState() != Thread.State.NEW)                           throw new IllegalThreadStateException();                       // 将该工作者线程添加到工作者线程组中                     workers.add(w);                       workerAdded = true;                       int s = workers.size();                       if (s > largestPoolSize)                           largestPoolSize = s;                   }               } finally {                   mainLock.unlock();               }               // 如果工作者线程已经添加则运行该线程             if (workerAdded) {                   t.start();                   workerStarted = true;               }           }       } finally {           if (! workerStarted)               addWorkerFailed(w);       }       return workerStarted;   } 

获取并执行任务

工作者线程获取任务主要有两个途径:

  1. 通过创建工作者线程时给予的
  2. 从任务队列中获取的
    期间会一直检查线程池的状态。如果遇到线程池停止了需要去确保当前线程的中断状态
    如果线程池没有终止也需要确保线程能够正常运行
final void runWorker(Worker w) {       Thread wt = Thread.currentThread();       Runnable task = w.firstTask;       w.firstTask = null;       w.unlock(); // allow interrupts       boolean completedAbruptly = true;       try {   	    // 有两种情况 	    // 1. 执行firstTask 	    // 2. 获取一个任务并执行         while (task != null || (task = getTask()) != null) {               w.lock();             // 如果线程池已经停止,确保当前线程是中断的             // 如果线程池正在运行,确保线程没有中断 	        // 第二次获取控制状态是为了确保线程池在关闭的过程中能够正常中断当前线程 			if ((runStateAtLeast(ctl.get(), STOP) ||   				 (Thread.interrupted() &&   				  runStateAtLeast(ctl.get(), STOP))) &&   				!wt.isInterrupted()) 				wt.interrupt(); 			// 从这里开始执行任务             try {                   beforeExecute(wt, task);                   try {                       task.run();                       afterExecute(task, null);                   } catch (Throwable ex) {                       afterExecute(task, ex);                       throw ex;                   }               } finally {                   task = null;                 // 增加任务执行数量,这里是线程封闭的,所以不需要考虑并发的情况                   w.completedTasks++;                   w.unlock();               }           }           completedAbruptly = false;       } finally {   	    // 线程自回收         processWorkerExit(w, completedAbruptly);       }   } 

从任务队列中获取任务

工作者线程通过 getTask获取一个任务来执行

private Runnable getTask() {       boolean timedOut = false; // Did the last poll() time out?          for (;;) {           int c = ctl.get();              // Check if queue empty only if necessary.           // 检测线程池状态,队列不能为空         if (runStateAtLeast(c, SHUTDOWN)               && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {               decrementWorkerCount();               return null;           }   		//          int wc = workerCountOf(c);              // 这一段用来表示核心线程是否受影响         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;   		// 工作者线程太多或者任务队列为空         if ((wc > maximumPoolSize || (timed && timedOut))               && (wc > 1 || workQueue.isEmpty())) {               // 减少工作者线程数量             if (compareAndDecrementWorkerCount(c))                   return null;               continue;           }   		// 从队列中获取一个任务         try {   	        // 这一段体现了keepAliveTime的作用 	        // 超过keepAliveTime给定时间没有获取到任务,那么线程将会被清理/回收掉             Runnable r = timed ?                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                   workQueue.take();               if (r != null)                   return r;               timedOut = true;           } catch (InterruptedException retry) {               timedOut = false;           }       }   } 

工作者线程回收

processWorkerExit 负责这一工作,具体流程如下:

  1. 从工作者线程集合中移除
  2. 尝试终止线程池
  3. 尝试通过创建一个工作者线程来替换当前线程,这种情况可能由以下原因
    1. 当前被清除的线程可能由任务异常而退出
    2. 没有工作者线程执行任务
    3. 队列中没有任务
private void processWorkerExit(Worker w, boolean completedAbruptly) {       if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted           decrementWorkerCount();          final ReentrantLock mainLock = this.mainLock;       mainLock.lock();       // 将工作者线程从集合中移除     try {   	    // 汇总执行完毕的任务数量         completedTaskCount += w.completedTasks;           workers.remove(w);       } finally {           mainLock.unlock();       }   	// 尝试终止线程池     tryTerminate();   	// 3. 尝试创建工作者线程     int c = ctl.get();       if (runStateLessThan(c, STOP)) {           if (!completedAbruptly) {   		    // 活跃的线程数量,其他的工作者线程可能超时回收了             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;               // 活跃的线程数量为0并且任务队列为空             if (min == 0 && ! workQueue.isEmpty()) 			// 当前的活跃线程数量                 min = 1;             // 实际的活跃线程数量要大于预测的就没必要创建,因为线程数量够用             if (workerCountOf(c) >= min)                   return; // replacement not needed           }           // 否则创建一个工作者线程来替代当前线程         addWorker(null, false);       }   } 

参考

Java 并发编程实战
Java线程池实现原理及其在美团业务中的实践

发表评论

相关文章