ThreadPoolExecutor

简介

  • 对象的创建和销毁需要一定的开销,线程亦是对象,创建线程和销毁线程必然也需要同样的开销。
  • 在某些情况下,线程执行的任务耗时不长,但是任务很多。这样就导致频繁的创建、销毁线程,需要很大的时间开销和资源开销。线程池应运而生。
  • 线程池相当于在这一个池中维护多个线程,需要执行任务时从池中取出一个线程用来执行任务,任务执行完成后将线程放回池中。这样也就减少了开销。
  • 因为减少了每个任务调度的开销,所以它能在执行大量异步任务的场景中提供更好的性能。并且它提供了一种限定和管理资源(比如线程)的方式。他也会保存一些基本的统计信息,比如已完成的任务数量。

Executor

Executor 接口中只有一个方法:

1
void execute(Runnable command);

command 为待执行的任务。

ExecutorService

ExecutorService 接口继承自 Executor 接口,并扩展了几个方法:

关于状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行,
// 可以调用awaitTermination等待所有任务执行。不再接受新的任务。
void shutdown();

// 立刻关闭,尝试取消正在执行的任务(不保证会取消成功),返回未被执行的任务
List<Runnable> shutdownNow();

// 是否发出关闭信号
boolean isShutdown();

// 是否所有任务都执行完毕在shutdown之后,也就是如果不调用shutdownNow或者
// shutdown是不可能返回true
boolean isTerminated();

// 进行等待直到所有任务完成或者超时
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

执行单个任务(立刻返回一个Future存储任务执行的实时状态):

1
2
3
4
5
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

执行多个任务(前两个方法等到所有任务完成才返回,后两个方法等到有一个任务完成,取消其他未完成的任务,返回执行完成的任务的执行结果):

1
2
3
4
5
6
7
8
9
10
11
12
13
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

AbstractExecutorService

该抽象类实现了 ExecutorService 接口,并提供了 ExecutorService 中各方法中的具体实现。例如 submit() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

并扩展了两个 newTaskFor() 方法:

1
2
3
4
5
6
7
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

FutureTask 是任务和执行结果的封装类。它实现了 RunnableFuture 接口,RunnableFuture 接口继承了 Runnable 和 Future 接口。可以通过 get() 方法获取任务的执行结果。

ThreadPoolExecutor

ThreadPoolExecutor 继承自 AbstractExecutorService。

初始化参数介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize: 核心线程数。新任务提交过来时,如果当前活动的线程数少于 corePoolSize 会创建一个新线程来处理这个新任务即使当前有空闲线程。
  • maximumPoolSize:最大线程数。如果当前线程数大于 corePoolSize 小于 maximumPoolSize 且任务队列已满时也会创建新线程。
  • keepAliveTime:如果当前线程数量超出了 corePoolSize,超出的那部分非核心线程会在空闲超出 keepAliveTime 时被终止。这能够在线程池活跃状态不足时及时回收占用的资源。默认情况下核心线程超时不回收,可以通过配置 keepAliveTime 和 allowCoreThreadTimeOut 来允许核心线程超时回收。
  • unit:超时时间单位
  • workQueue:任务等待队列,存放任务
  • threadFactory:线程工厂用于生产线程。可以自定义实现
  • handler:也就是参数 maximumPoolSize 达到后丢弃处理的方法,java 提供了4种丢弃处理的方法;java 默认的是使用 AbortPolicy ,他的作用是当出现这中情况的时候会抛出一个异常。
    1. AbortPolicy: 拒绝提交,直接抛出异常,也是默认的饱和策略;
    2. CallerRunsPolicy: 线程池还未关闭时,用调用者的线程执行任务;
    3. DiscardPolicy: 直接丢弃任务
    4. DiscardOldestPolicy:线程池还未关闭时,丢掉阻塞队列最久为处理的任务,并且执行当前任务。

Executors 构造 ThreadPoolExecutor 对象

Executors 类提供了几个构造 ThreadPoolExecutor 对象的静态方法:

  1. newCachedThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    threadFactory);
    }

    线程数量没有上界(Integer.MAX_VALUE),有新任务提交并且没有空闲线程时,创建一个新线程执行该任务,每个线程空闲时间为 60s, 60s 空闲后线程会被移出缓存。使用 SynchronousQueue 作为任务队列的实现类。适用于执行大量生命周期短的异步任务。

  1. newFixedThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }

    固定容量的线程池。使用 LinkedBlockingQueue 作为任务队列的实现类。当新任务到达时,创建新线程,当线程数达到上限时,将任务放到队列中,任务队列中任务数量没有上界。当线程创建之后就一直存在直至显式的调用 shutdown() 方法。

  1. newSingleThreadExecutor()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }

    单个 Worker 的线程池。和 newFixedThreadPool(1) 类似,区别在于这个实例经过了一次封装,不能对该实例的参数进行重配置,并且实现了 finalize() 方法,能够在 GC 时调用 shutdown() 方法关闭该线程池。

  1. newScheduledThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

    ScheduledThreadPoolExecutor 的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行。

  1. newSingleThreadScheduledExecutor()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    将 ScheduledThreadPoolExecutor 进行一层包装。

状态转换

ThreadPoolExecutor 中一些状态相关的变量和方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //低29位表示

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } //c & 111000...000
private static int workerCountOf(int c) { return c & CAPACITY; } //c & 000111...111
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

线程池的状态在 ThreadPoolExecutor 中通过 AtomicInteger 类型的成员变量 ctl 的高3位表示。

  • RUNNING: 111
  • SHUTDOWN: 000
  • STOP: 001
  • TIDYING: 010
  • TERMINATED: 011

变量 ctl 的低29位表示的有效工作线程数。

各状态之间的转换如下图:



  • SHUTDOWN 想转化为 TIDYING,需要 workQueue 为空,同时 workerCount 为0。
  • STOP 转化为 TIDYING,需要 workerCount 为0

方法解析

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果运行中的worker线程数少于设定的核心线程数,增加worker线程,把task分配给新建的worker线程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果任务可以被加入到任务队列中,即等待的任务数还在允许的范围内,
// 再次检查线程池是否被关闭,如果关闭的话,则移除任务并拒绝该任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务数超过了现有worker线程的承受范围,尝试新建worker线程
// 如果无法添加新的worker线程,则会拒绝该任务
else if (!addWorker(command, false))
reject(command);
}

addWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 1. rs > shutdown,即shutdown和running以外的状态
// 2. rs = shutdown
// 1)firstTask不为null,即有task分配
// 2)没有task,但是workQueue(等待任务队列)为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 1. 如果没有设定线程数的限制,worker线程数不能大于最大值(2的29次方-1)
// 2. 如果是固定尺寸的线程池,不能大于固定尺寸
// 3. 如果是可扩展的线程池,不能大于规定的线程数的上限
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 用CAS操作增加线程数量,如果失败,重新循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 检查以下任一状态是否出现
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//执行添加失败相对应的操作
addWorkerFailed(w);
}
return workerStarted;
}

runWorker()

ThreadPoolExecutor 有一个成员类叫 Worker

1
private final class Worker extends AbstractQueuedSynchronizer implements Runnable

这里 AbstractQueuedSynchronizer 的作用是使Worker具有锁的功能,在执行任务时,会把 Worker 锁住,这个时候就无法中断 Worker。Worker 空闲时候是线程池可以通过获取锁,改变 Worker 的某些状态,在此期间因为锁被占用,Worker 就是不会执行任务的。

Worker工作的逻辑在ThreadPoolExecutor#runWorker方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//task为null则从BlockingQueue中等待获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//1. completedAbruptly为true,代表异常,则工作线程数减一
//2. completedTaskCount += w.completedTasks; workers.remove(w);
//3. tryTerminate();尝试停止线程池,正常运行的线程池调用该方法不会有任何动作
//4. 如果线程池没有被关闭的话,Worker也不是异常退出,并且Worker线程数小于最小值(分类情况见源码),则新建一个Worker线程:addWorker(null, false);
processWorkerExit(w, completedAbruptly);
}
}

shutdown()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以操作目标线程
checkShutdownAccess();
//设置状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

接着看下 interruptIdleWorkers() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//中断所有的空闲线程(正在从 workQueue 中取 Task,此时 Worker 没有加锁)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdownNow()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
//中断所有的线程
interruptWorkers();
//拒绝所有新Task的加入,WorkerQueue中没有执行的线程全部抛弃。所以此时Pool是空的,WorkerQueue也是空的。
//获取所有没有执行的Task,并且返回。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//进行到TIDYING和TERMINATED的转化
tryTerminate();
return tasks;
}

总结

本文从 ThreadPoolExecutor 的类继承体系,到初始化参数详解,再到状态转换以及重要方法的解读,由浅入深的介绍了 ThreadPoolExecutor。通过本文可以对线程池的运行原理有一个最基本的理解。