awakeBird Back-end Dev Engineer

Java核心类库学习(一) 线程池

2019-01-14

这系列文章会对Java的一些核心类库实现做点笔记,本篇学习的是线程池相关实现。

接口和抽象类

线程池相关

  • Executor:是一个抽象层面的核心接口
    public interface Executor {
      void execute(Runnable command);
    }
    
  • ExecutorService:线程池的抽象接口,继承了Executor并对其进行扩展,提供了一些线程池管理的方法,并引入了Future对象,不但能执行Runnable对象,也可以执行Callable对象。
    public interface ExecutorService extends Executor {
      void shutdown();
      List<Runnable> shutdownNow();
      boolean awaitTermination(long timeout, TimeUnit unit);
      <T> Future<T> submit(Callable<T> task);
      <T> Future<T> submit(Runnable task, T result);
      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
      <T> T invokeAny(Collection<? extends Callable<T>> tasks);
    }
    
  • AbstractExecutorServiceExecutorService的抽象实现类,实现了invokeAllinvokeAny方法。
  • ThreadPoolExecutor:继承自AbstractExecutorServiceExecutorService的主要实现类。
  • ForkJoinPool:采用分治和work-stealing的思想对大任务进行拆分,并行执行,合理利用CPU资源,ExecutorService的另一个实现。
  • Executors:一个工具类,提供不同的工厂方法来创建不同的线程池,如FixedThreadPoolSingleThreadExecutorScheduledThreadPoolCacheThreadExecutor,类似于Collections

任务相关

均为函数式接口Functional Interface,即只存在一个抽象方法

  • Runable
    @FunctionalInterface
    public interface Runnable {
      public abstract void run();
    }
    
  • Callable:配合ExecutorService使用,存在返回值。
    @FunctionalInterface
    public interface Callable<V> {
      V call() throws Exception;
    }
    

Future

  • Future:抽象接口,对特定的RunableCallable任务进行取消、查询执行结果、查询是否完成等操作。其中get()方法会阻塞直到任务返回结果。
    public interface Future<V> {
      boolean cancel(boolean mayInterruptIfRunning); // 参数指定是否打断任务执行
      boolean isCancelled();
      boolean isDone();
      V get() throws InterruptedException, ExecutionException;
      V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
    }
    
  • RunnableFuture:继承RunnableFuture接口,既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
    public interface RunnableFuture<V> extends Runnable, Future<V> {
      void run();
    }
    
  • FutureTaskRunnableFuture的实现类,也是唯一实现类,内部维护了一个Thread对象和一个Callable对象,给出了两个构造方法。
    public class FutureTask<V> implements RunnableFuture<V> {
      public FutureTask(Callable<V> callable)
      public FutureTask(Runnable runnable, V result)
    }
    

ThreadFactory

  • ThreadFactory:创建线程的工厂接口,与ExecutorService配合使用,用来创建线程。Executors中提供了两个实现类,分别是DefaultThreadFactoryPrivilegedThreadFactory
    public interface ThreadFactory {
      Thread newThread(Runnable r);
    }
    

ThreadPoolExecutor

下面看看ThreadPoolExecutor的内部实现。

几个关键的内部变量

状态位

引入状态位,高3位表示线程池状态,低29位表示线程池woker(一个线程的抽象)的数量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

线程池维护5个状态,状态的变化是不可逆的,由小到大依次是:

  • RUNNING:允许接收新任务并且处理队列中的任务
  • SHUTDOWN:不再接收新的任务,仅消化队列内的任务
  • STOP:不再接收新的任务,队列内的任务也不再处理,正在尝试中断正在执行的任务线程
  • TIDYING:所有任务被终止了,工作线程数workCount也被设为0,并开始调用钩子函数terminated()
  • TERMINATED:钩子函数terminated()执行完毕

任务队列

维护一个先进先出的并发队列,用来缓存未分配给worker的任务。Executors中采用的默认队列是LinkedBlockingQueue,它的大小上限是Integer.MAX_VALUE

private final BlockingQueue<Runnable> workQueue;

工作线程集合

维护一个HashSet保存所创建的工作线程,由于HashSet是非线程安全的,在使用时要加锁。

private final HashSet<Worker> workers = new HashSet<Worker>();

任务拒绝Handler

RejectedExecutionHandler是拒绝任务的接口,声明了rejectedExecution()方法。

private volatile RejectedExecutionHandler handler;

Handler一般有下面几种

  • AbortPolicy:抛出异常(默认)
  • CallerRunsPolicy:使用调用者所在线程执行任务
  • DiscardOldestPolicy:从workerQueue中poll一个任务,执行当前任务
  • DiscardPolicy:默默抛弃,什么都不做

线程数目

private volatile int corePoolSize; //当allowCoreThreadTimeOut是false时,线程池内维护的线程最小数目
private volatile int maximumPoolSize; // 线程池内会维持的最大数目

线程超时时间

private volatile boolean allowCoreThreadTimeOut; // 默认false,线程数小于corePoolSize的线程不会被回收
private volatile long keepAliveTime; // 线程数大于corePoolSize时创建的线程的空闲回收时间,allowCoreThreadTimeOut为ture时所有线程都会被回收

几个关键方法

execute

执行新任务的核心方法,调用submit也会执行该方法,具体执行流程如下:

  • 1.当线程池小于corePoolSize时,将创建一个新线程执行任务,即使此时线程池中存在空闲线程
  • 2.当线程池达到corePoolSize时,任务将被放入workQueue中,等待线程池中任务调度执行
  • 3.当workQueue已满,且maximumPoolSize > corePoolSize时,创建新线程执行任务
  • 4.当提交任务数超过maximumPoolSize时,任务由RejectedExecutionHandler处理
  • 5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程

addWorker

execute方法需要创建新线程执行任务时会调用addWorker方法,会将任务封装为一个Worker对象,加入HashSet中并启动相应工作线程。

// addWoker 部分代码
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start(); // 启动worker
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}

Worker是实现了Runable接口的内部类,对工作线程进行了封装。同时Worker继承了AQS类,其tryLock()方法为不可重入的独占锁,用于判断线程是否空闲以及是否可以被打断。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // Worker类本身实现了Runnable接口,线程调用start会直接触发runWorker
    }

    public void run() {
        runWorker(this);
    }
}

runWorker

runWorker()方法内维护一个循环,执行当前Worker对象的Task或调用getTask()方法从workerQueue中读取任务进行执行,方法中会调用两个钩子方法beforeExecute()afterExecute()在任务执行前后进行特定工作。

当前Worker对象没有Task或getTask()方法返回null时,会结束循环并调用processWorkerExit()方法将线程关闭。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    try {
        while (task != null || (task = getTask()) != null) {
            // ...
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

getTask()在下面几种情况将返回null:

  • 当前线程数即将超过maxPoolSize
  • 线程池被关闭
  • allowCoreThreadTimeOutfalse,线程数大于corePoolSize,并且从workerQueue取数据超过了keepAliveTime
  • allowCoreThreadTimeOuttrue,从workerQueue取数据超过了keepAliveTime

shutdown和shutdownNow

关闭(销毁)线程池,shutdown()会将线程池状态设置为SHUTDOWN,拒绝新任务加入,终端空闲线程,但已在队列里的任务会继续处理。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers(); // 中断空闲线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdownNow()方法则会强行关闭线程池,尝试将线程池状态设置为STOP,会中断所有线程。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers(); // 中断所有线程
        tasks = drainQueue(); // 取出队列中没有被执行的任务
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

Executors中的几种常见的线程池

FixedThreadPool

指定了最大线程数,采用LinkedBlockingQueue作为workerQueue。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor

只有一个线程的线程池,同样采用LinkedBlockingQueue作为workerQueue。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

CachedThreadPool

每个任务都会创建新县城执行任务,最大线程数为Integer.MAX_VALUE,采用同步阻塞队列SynchronousQueue作为workerQueue。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

ScheduledThreadPool

可以实现延迟、调度。最大线程数为Integer.MAX_VALUE,采用有序队列DelayedWorkQueue作为workerQueue(会按每个任务按照距离下次执行时间间隔的大小来排序)。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

阿里Java开发文档不允许使用Executors创建线程池

从上面可以看到,Executors给出的几种线程池都是有弊端的:

  • FixedThreadPoolSingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
  • CachedThreadPoolScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

因此,在实际使用时需要灵活配置参数,直接用ThreadPoolExecutor创建线程池。

(End)

参考资料:


Similar Posts

Comments