Executors解析

前言

Exectors 中定义了 ExecutorExecutorserviceScheduledExecutorServiceThreadFactoryCallable类的工厂和实用方法,提供了大量创建连接池的静态方法。Exectors 支持以下方法:

  1. 创建并返回一个带有常用配置设置的 Executorservice
  2. 创建并返回一个带有常用配置设置的 ScheduledExecutorService
  3. 创建并返回一个包装过的 Executorservice ,通过使实现特定的方法不可访问,来禁用重新配置。
  4. 创建并返回一个 ThreadFactory ,将新创建的线程设置为已知状态。
  5. 从其他类似闭包的窗体中创建并返回 Callable ,因此可以在需要 Callable 的执行方法中使用。
1
2
3
4
5
public class Executors {
//构造器私有化,只能通过类去直接调用静态方法,而不允许创建类的实例对象。
private Executors() {}
...
}

创建线程池

ThreadPoolExecutor 继承自 AbstractExecutorService 抽象类。

1
2
3
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • corePoolSize:核心线程数

  • maxmumPoolSize:池中允许的最大线程数

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止

  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    1
    2
    3
    4
    5
    6
    7
    TimeUnit.DAYS; //天
    TimeUnit.HOURS; //小时
    TimeUnit.MINUTES; //分钟
    TimeUnit.SECONDS; //秒
    TimeUnit.MILLISECONDS; //毫秒
    TimeUnit.MICROSECONDS; //微妙
    TimeUnit.NANOSECONDS; //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    1
    2
    3
    ArrayBlockingQueue // 采用数组实现的有界阻塞线程安全队列
    LinkedBlockingQueue // 通过单向链表实现的无界缓存阻塞队列
    SynchronousQueue // 无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
  • threadFactory:创建新线程的工厂。

  • handler:拒绝策略,当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    1
    2
    3
    4
    ThreadPoolExecutor.AbortPolicy //丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy //也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy //丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy //由调用线程处理该任务

创建固定大小的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** 创建一个固定大小的线程池,以共享的无界队列方式来运行这些线程 */
public static ExecutorService newFixedThreadPool(int nThreads) {
// 核心池的大小和池中允许的最大线程数均为nThreads
// 线程没有任务执行时会不会终止
// 线程排队采用的是LinkedBlockingQueue
// 使用默认的线程工厂创建新线程
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

/** 创建一个固定大小的线程池,以共享的无界队列方式来运行这些线程 */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
//和上面相比,需要指定 ThreadFactory,在需要时使用提供的 ThreadFactory 创建新线程
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
}

/** 创建一个单个线程的线程池,以无界队列方式来运行该线程 */
public static ExecutorService newSingleThreadExecutor() {
// 使用默认的线程工厂创建新线程
// 和 Executors.newFixedThreadPool(1) 差不多的效果。
// 不一样的是newSingleThreadExecutor创建的线程池被一个FinalizableDelegatedExecutorService包装了一下
// FinalizableDelegatedExecutorService 继承了DelegatedExecutorService类并增加了一个finalize方法,finalize方法会在虚拟机利用垃圾回收清理对象时被调用。
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

/** 创建一个单个线程的线程池,以无界队列方式来运行该线程 */
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
// 和上面相比,需要指定 ThreadFactory,在需要时使用提供的 ThreadFactory 创建新线
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
}

创建可按需自动扩容的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
/** 创建一个可按需自动扩容的线程池,但优先重用线程池中空闲可用的线程 */
public static ExecutorService newCachedThreadPool() {
// 终止并从缓存中移除那些已有 60 秒钟未被使用的线程
// 采用SynchronousQueue
// 使用默认的线程工厂创建新线程
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

/** 创建一个可按需自动扩容的线程池,但优先重用线程池中空闲可用的线程 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
// 和上面相比,需要指定 ThreadFactory,在需要时使用提供的 ThreadFactory 创建新线
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}

创建定延时后执行异步任务或者周期性执行任务的线程池

DelegatedScheduledExecutorServiceScheduledExecutorService 的包装类,只向外暴露 ScheduledExecutorService 实现的 “schedule” 方法。

1
2
3
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor 实现 ScheduledExecutorService 接口,最终调用的还是 ThreadPoolExecutor 的构造方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** 创建一个单线程执行程序,它可安排在给定延迟后执行或者定期地执行 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
// 线程排队采用的是DelayedWorkQueue
// 核心线程数为1
// 池中允许的最大线程数为Integer.MAX_VALUE
// 使用默认的线程工厂创建新线程
// 线程没有任务执行时会不会终止
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

/** 创建一个单线程执行程序,它可安排在给定延迟后执行或者定期地执行 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
// 线程排队采用的是DelayedWorkQueue
// 核心线程数为1
// 池中允许的最大线程数为Integer.MAX_VALUE
// 使用指定的线程工厂创建新线程
// 线程没有任务执行时会不会终止
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}

/** 创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
// 线程排队采用的是DelayedWorkQueue
// 核心线程数为corePoolSize
// 池中允许的最大线程数为Integer.MAX_VALUE
// 使用默认的线程工厂创建新线程
// 线程没有任务执行时不会终止
return new ScheduledThreadPoolExecutor(corePoolSize);
}

/** 创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
// 线程排队采用的是DelayedWorkQueue
// 核心线程数为corePoolSize
// 池中允许的最大线程数为Integer.MAX_VALUE
// 使用指定的线程工厂创建新线程
// 线程没有任务执行时会不会终止
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

创建工作窃取的线程池

1
2
3
4
5
6
7
8
9
10
11
12
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
  • parallelism:并行度,默认情况下跟我们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我们机器运行时可用的CPU个数。
  • factory:创建新线程的工厂。默认情况下使用 ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory
  • handler:线程异常情况下的处理器,该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为 null。
  • asyncMode:这个参数要注意,在 ForkJoinPool 中,每一个工作线程都有一个独立的任务队列,asyncMode 表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。工作线程在处理本地任务时使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 利用所有运行的处理器数目来创建一个工作窃取的线程池 */
public static ExecutorService newWorkStealingPool() {
// 采用默认的 ForkJoinWorkerThreadFactory 来创建新线程
// 并行度为 Java虚拟机可用的处理器数量。
// 采用 FIFO_QUEUE
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}

/** 根据给定的并行等级,创建一个拥有足够的线程数目的工作窃取的线程池 */
public static ExecutorService newWorkStealingPool(int parallelism) {
// 采用默认的 ForkJoinWorkerThreadFactory 来创建新线程
// 并行度为 parallelism
// 采用 FIFO_QUEUE
return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}

创建不可配置的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//返回一个将所有已定义的ExecutorService方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}

//返回一个将所有已定义的ScheduledExecutorService方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
// 类似Executors.newSingleThreadScheduledExecutor()
return new DelegatedScheduledExecutorService(executor);//ScheduledExecutorService继承了ExecutorService
}

其他方法

创建线程工厂

1
2
3
4
/** 返回用于创建新线程的默认线程工厂 */
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}

此工厂创建同一 ThreadGroup中 Executor 使用的所有新线程。如果有 SecurityManager,则它使用 System.getSecurityManager()组来调用此 defaultThreadFactory 方法,其他情况则使用线程组。每个新线程都作为非守护程序而创建,并且具有设置为 Thread.NORM_PRIORITY 中较小者的优先级以及线程组中允许的最大优先级。新线程具有可通过 pool-N-thread-MThread.getName()来访问的名称,其中 N 是此工厂的序列号, M 是此工厂所创建线程的序列号。

1
2
3
4
/** 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限 */
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}

此工厂创建具有与 defaultThreadFactory() 相同设置的线程,新线程的 AccessControlContextcontextClassLoader 的其他设置与调用此 privilegedThreadFactory 方法的线程相同,新线程与当前线程具有相同的权限。可以在 AccessController.doPrivileged(java.security.PrivilegedAction )操作中创建一个新 privilegedThreadFactory,设置当前线程的访问控制上下文,以便创建具有该操作中保持的所选权限的线程。

创建 Callable 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/** 返回 Callable 对象,调用它时可运行给定的任务并返回 null */
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}

/** 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果 */
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/** 返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果 */
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }
};
}

/** 返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果 */
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }
};
}

/** 返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象 */
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}

/** 返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象 */
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}

内部类

Callable相关

RunnableAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//实现了Callable接口,可运行给定的任务并返回给定的结果
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;

//构造器
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}

//实现了Callable接口定义的call方法
public T call() {
task.run();//运行所定义的Runnable方法
return result;//返回 T result
}
}

PrivilegedCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//实现了Callable接口
static final class PrivilegedCallable<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;

//构造器
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}

//实现了Callable接口定义的call方法
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}

PrivilegedCallableUsingCurrentClassLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//实现了Callable接口
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;

//构造器
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.task = task;
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}

//实现了Callable接口定义的call方法
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
return task.call();
} else {
t.setContextClassLoader(ccl);
try {
return task.call();
} finally {
t.setContextClassLoader(cl);
}
}
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}

线程工厂类

DefaultThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//实现ThreadFactory接口(创建默认线程工厂)
static class DefaultThreadFactory implements ThreadFactory {
//线程池大小
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//线程数
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称
private final String namePrefix;

//构造器
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();//获取安全管理器对象
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();//得到当前线程组
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";//线程名称初始化
}

//实现ThreadFactory接口:创建新线程的方法
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);//创建一个新的线程加入到当前线程组(线程名称加1)
if (t.isDaemon())//判断是否设置了后台守护标志
t.setDaemon(false);//设为后台线程
if (t.getPriority() != Thread.NORM_PRIORITY)//将线程优先级统统设置为5
t.setPriority(Thread.NORM_PRIORITY);//不为5的统统改成5
return t;//返回创建的线程对象
}
}

PrivilegedThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//继承DefaultThreadFactory,增加成员变量,重写newThread方法
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;

private final ClassLoader ccl;//类加载器

PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}

//重写newThread方法
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}

包装类

DelegatedExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//继承AbstractExecutorService抽象类
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;

//构造器
DelegatedExecutorService(ExecutorService executor) { e = executor; }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public void execute(Runnable command) { e.execute(command); }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public void shutdown() { e.shutdown(); }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public List<Runnable> shutdownNow() { return e.shutdownNow(); }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public boolean isShutdown() { return e.isShutdown(); }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public boolean isTerminated() { return e.isTerminated(); }

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public Future<?> submit(Runnable task) {
return e.submit(task);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}

//具体实现依赖于传入的ExecutorService的实现类中定义的方法
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

FinalizableDelegatedExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
//继承DelegatedExecutorService类,重写了finalize()回收方法
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
//构造器
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}

//重写finalize()回收方法
protected void finalize() {
super.shutdown();
}
}

DelegatedScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//在DelegatedExecutorService的基础上,增加了对ScheduledExecutorService接口的实现
static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService {
private final ScheduledExecutorService e;

//构造器
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}

//具体实现依赖于ScheduledExecutorService接口的实现
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}

//具体实现依赖于ScheduledExecutorService接口的实现
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}

//具体实现依赖于ScheduledExecutorService接口的实现
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}

//具体实现依赖于ScheduledExecutorService接口的实现
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

总结

通过 Executors 提供四种常用线程池:newFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecutornewScheduledThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
// 创建固定数目线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads);

// 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。
// 如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newCachedThreadPool();

// 创建一个单线程化的Executor。
public static ExecutorService newSingleThreadExecutor();

// 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

newFixedThreadPool 创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,提交的任务则进入队列等待,等着有闲置的线程来执行这些任务。它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
Runnable syncRunnable = new Runnable() {
public void run() {
//...
}
};
executorService.execute(syncRunnable);
}

newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

  • 工作线程的创建数量最大为 Interger. MAX_VALUE, 这样可灵活的往线程池中添加线程。
  • 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
  • 如果存在某一线程持续一段时间没有工作(默认为1分钟),则该线程就会销毁回收。
1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
Runnable syncRunnable = new Runnable() {
public void run() {
//...
}
};
executorService.execute(syncRunnable);
}

*newSingleThreadExecutor *创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。特点:有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则。

1
2
3
4
5
6
7
8
9
10
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
Runnable syncRunnable = new Runnable() {
@Override
public void run() {
//...
}
};
executorService.execute(syncRunnable);
}

newScheduledThreadPool创建一个定长的线程池,而且支持定时或周期性的任务执行。主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

1
2
3
4
5
6
7
8
9
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 20; i++) {
Runnable syncRunnable = new Runnable() {
public void run() {
//...
}
};
executorService.schedule(syncRunnable, 5000, TimeUnit.MILLISECONDS);//延迟5秒执行
}
1
2
3
4
//延迟3秒后执行任务,从开始执行任务开始计时,每7秒执行一次不管执行任务需要多长的时间
executorService.scheduleAtFixedRate(new Runnable(),3, 7, TimeUnit.SECONDS);
//延迟3秒后执行任务,从任务完成时开始计时,每7秒执行一次需要等到任务执行完成才开始计时
executorService.scheduleWithFixedDelay(new Runnable(),3, 7, TimeUnit.SECONDS);

线程池不推荐使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式去创建,这样的处理方式让线程池的创建者更加明确线程池的运行规则,规避资源耗尽的风险。