这系列文章会对Java的一些核心类库实现做点笔记,本篇学习的是线程池相关实现。
接口和抽象类
线程池相关
Executor
:是一个抽象层面的核心接口public interface Executor { void execute(Runnable command); }
ExecutorService
:线程池的抽象接口,继承了Executor
并对其进行扩展,提供了一些线程池管理的方法,并引入了Future
对象,不但能执行Runnable
对象,也可以执行Callable
对象。public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean awaitTermination(long timeout, TimeUnit unit); <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); <T> T invokeAny(Collection<? extends Callable<T>> tasks); }
AbstractExecutorService
:ExecutorService
的抽象实现类,实现了invokeAll
和invokeAny
方法。ThreadPoolExecutor
:继承自AbstractExecutorService
,ExecutorService
的主要实现类。ForkJoinPool
:采用分治和work-stealing的思想对大任务进行拆分,并行执行,合理利用CPU资源,ExecutorService
的另一个实现。Executors
:一个工具类,提供不同的工厂方法来创建不同的线程池,如FixedThreadPool
、SingleThreadExecutor
、ScheduledThreadPool
和CacheThreadExecutor
,类似于Collections
。
任务相关
均为函数式接口Functional Interface,即只存在一个抽象方法
Runable
@FunctionalInterface public interface Runnable { public abstract void run(); }
Callable
:配合ExecutorService
使用,存在返回值。@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Future
Future
:抽象接口,对特定的Runable
或Callable
任务进行取消、查询执行结果、查询是否完成等操作。其中get()
方法会阻塞直到任务返回结果。public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); // 参数指定是否打断任务执行 boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException }
RunnableFuture
:继承Runnable
和Future
接口,既可以作为Runnable
被线程执行,又可以作为Future
得到Callable
的返回值。public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask
:RunnableFuture
的实现类,也是唯一实现类,内部维护了一个Thread
对象和一个Callable
对象,给出了两个构造方法。public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Callable<V> callable) public FutureTask(Runnable runnable, V result) }
ThreadFactory
ThreadFactory
:创建线程的工厂接口,与ExecutorService
配合使用,用来创建线程。Executors
中提供了两个实现类,分别是DefaultThreadFactory
和PrivilegedThreadFactory
。public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadPoolExecutor
下面看看ThreadPoolExecutor
的内部实现。
几个关键的内部变量
状态位
引入状态位,高3位表示线程池状态,低29位表示线程池woker(一个线程的抽象)的数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
线程池维护5个状态,状态的变化是不可逆的,由小到大依次是:
- RUNNING:允许接收新任务并且处理队列中的任务
- SHUTDOWN:不再接收新的任务,仅消化队列内的任务
- STOP:不再接收新的任务,队列内的任务也不再处理,正在尝试中断正在执行的任务线程
- TIDYING:所有任务被终止了,工作线程数
workCount
也被设为0,并开始调用钩子函数terminated()
- TERMINATED:钩子函数
terminated()
执行完毕
任务队列
维护一个先进先出的并发队列,用来缓存未分配给worker的任务。Executors
中采用的默认队列是LinkedBlockingQueue
,它的大小上限是Integer.MAX_VALUE
。
private final BlockingQueue<Runnable> workQueue;
工作线程集合
维护一个HashSet
保存所创建的工作线程,由于HashSet
是非线程安全的,在使用时要加锁。
private final HashSet<Worker> workers = new HashSet<Worker>();
任务拒绝Handler
RejectedExecutionHandler
是拒绝任务的接口,声明了rejectedExecution()
方法。
private volatile RejectedExecutionHandler handler;
Handler一般有下面几种
AbortPolicy
:抛出异常(默认)CallerRunsPolicy
:使用调用者所在线程执行任务DiscardOldestPolicy
:从workerQueue中poll一个任务,执行当前任务DiscardPolicy
:默默抛弃,什么都不做
线程数目
private volatile int corePoolSize; //当allowCoreThreadTimeOut是false时,线程池内维护的线程最小数目
private volatile int maximumPoolSize; // 线程池内会维持的最大数目
线程超时时间
private volatile boolean allowCoreThreadTimeOut; // 默认false,线程数小于corePoolSize的线程不会被回收
private volatile long keepAliveTime; // 线程数大于corePoolSize时创建的线程的空闲回收时间,allowCoreThreadTimeOut为ture时所有线程都会被回收
几个关键方法
execute
执行新任务的核心方法,调用submit
也会执行该方法,具体执行流程如下:
- 1.当线程池小于
corePoolSize
时,将创建一个新线程执行任务,即使此时线程池中存在空闲线程 - 2.当线程池达到
corePoolSize
时,任务将被放入workQueue中,等待线程池中任务调度执行 - 3.当
workQueue
已满,且maximumPoolSize > corePoolSize
时,创建新线程执行任务 - 4.当提交任务数超过
maximumPoolSize
时,任务由RejectedExecutionHandler
处理 - 5.当线程池中超过
corePoolSize
线程,空闲时间达到keepAliveTime
时,关闭空闲线程
addWorker
execute
方法需要创建新线程执行任务时会调用addWorker
方法,会将任务封装为一个Worker
对象,加入HashSet
中并启动相应工作线程。
// addWoker 部分代码
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动worker
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
Worker
是实现了Runable
接口的内部类,对工作线程进行了封装。同时Worker
继承了AQS
类,其tryLock()
方法为不可重入的独占锁,用于判断线程是否空闲以及是否可以被打断。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Worker类本身实现了Runnable接口,线程调用start会直接触发runWorker
}
public void run() {
runWorker(this);
}
}
runWorker
runWorker()
方法内维护一个循环,执行当前Worker
对象的Task或调用getTask()
方法从workerQueue
中读取任务进行执行,方法中会调用两个钩子方法beforeExecute()
和afterExecute()
在任务执行前后进行特定工作。
当前Worker
对象没有Task或getTask()
方法返回null
时,会结束循环并调用processWorkerExit()
方法将线程关闭。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
try {
while (task != null || (task = getTask()) != null) {
// ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
getTask()
在下面几种情况将返回null:
- 当前线程数即将超过
maxPoolSize
- 线程池被关闭
allowCoreThreadTimeOut
为false
,线程数大于corePoolSize
,并且从workerQueue
取数据超过了keepAliveTime
allowCoreThreadTimeOut
为true
,从workerQueue
取数据超过了keepAliveTime
shutdown和shutdownNow
关闭(销毁)线程池,shutdown()
会将线程池状态设置为SHUTDOWN
,拒绝新任务加入,终端空闲线程,但已在队列里的任务会继续处理。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers(); // 中断空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow()
方法则会强行关闭线程池,尝试将线程池状态设置为STOP
,会中断所有线程。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 取出队列中没有被执行的任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Executors中的几种常见的线程池
FixedThreadPool
指定了最大线程数,采用LinkedBlockingQueue
作为workerQueue。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
SingleThreadExecutor
只有一个线程的线程池,同样采用LinkedBlockingQueue
作为workerQueue。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
CachedThreadPool
每个任务都会创建新县城执行任务,最大线程数为Integer.MAX_VALUE
,采用同步阻塞队列SynchronousQueue
作为workerQueue。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
ScheduledThreadPool
可以实现延迟、调度。最大线程数为Integer.MAX_VALUE
,采用有序队列DelayedWorkQueue
作为workerQueue(会按每个任务按照距离下次执行时间间隔的大小来排序)。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
阿里Java开发文档不允许使用Executors创建线程池
从上面可以看到,Executors
给出的几种线程池都是有弊端的:
FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOMCachedThreadPool
和ScheduledThreadPool
:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
因此,在实际使用时需要灵活配置参数,直接用ThreadPoolExecutor
创建线程池。
(End)
参考资料: