前言
在 Exectors
中定义了 Executor
、Executorservice
、ScheduledExecutorService
、ThreadFactory
和Callable
类的工厂和实用方法,提供了大量创建连接池的静态方法。Exectors
支持以下方法:
- 创建并返回一个带有常用配置设置的
Executorservice
。 - 创建并返回一个带有常用配置设置的
ScheduledExecutorService
。 - 创建并返回一个包装过的
Executorservice
,通过使实现特定的方法不可访问,来禁用重新配置。 - 创建并返回一个
ThreadFactory
,将新创建的线程设置为已知状态。 - 从其他类似闭包的窗体中创建并返回
Callable
,因此可以在需要 Callable 的执行方法中使用。
1 | public class Executors { |
创建线程池
ThreadPoolExecutor
继承自 AbstractExecutorService
抽象类。
1 | public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, |
corePoolSize:核心线程数
maxmumPoolSize:池中允许的最大线程数
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
1
2
3
4
5
6
7TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
1
2
3ArrayBlockingQueue // 采用数组实现的有界阻塞线程安全队列
LinkedBlockingQueue // 通过单向链表实现的无界缓存阻塞队列
SynchronousQueue // 无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。threadFactory:创建新线程的工厂。
handler:拒绝策略,当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
1
2
3
4ThreadPoolExecutor.AbortPolicy //丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy //也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy //丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy //由调用线程处理该任务
创建固定大小的线程池
1 | /** 创建一个固定大小的线程池,以共享的无界队列方式来运行这些线程 */ |
创建可按需自动扩容的线程池
1 | /** 创建一个可按需自动扩容的线程池,但优先重用线程池中空闲可用的线程 */ |
创建定延时后执行异步任务或者周期性执行任务的线程池
DelegatedScheduledExecutorService
是 ScheduledExecutorService
的包装类,只向外暴露 ScheduledExecutorService
实现的 “schedule” 方法。
1 | public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) { |
ScheduledThreadPoolExecutor
继承 ThreadPoolExecutor
实现 ScheduledExecutorService
接口,最终调用的还是 ThreadPoolExecutor
的构造方法。
1 | /** 创建一个单线程执行程序,它可安排在给定延迟后执行或者定期地执行 */ |
创建工作窃取的线程池
1 | private ForkJoinPool(int parallelism, |
- parallelism:并行度,默认情况下跟我们机器的cpu个数保持一致,使用
Runtime.getRuntime().availableProcessors()
可以得到我们机器运行时可用的CPU个数。 - factory:创建新线程的工厂。默认情况下使用
ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory
。 - handler:线程异常情况下的处理器,该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为 null。
- asyncMode:这个参数要注意,在
ForkJoinPool
中,每一个工作线程都有一个独立的任务队列,asyncMode 表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。工作线程在处理本地任务时使用 FIFO 顺序。这种模式下的ForkJoinPool
更接近于是一个消息队列,而不是用来处理递归式的任务。
1 | /** 利用所有运行的处理器数目来创建一个工作窃取的线程池 */ |
创建不可配置的线程池
1 | //返回一个将所有已定义的ExecutorService方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法 |
其他方法
创建线程工厂
1 | /** 返回用于创建新线程的默认线程工厂 */ |
此工厂创建同一 ThreadGroup
中 Executor 使用的所有新线程。如果有 SecurityManager
,则它使用 System.getSecurityManager()
组来调用此 defaultThreadFactory 方法,其他情况则使用线程组。每个新线程都作为非守护程序而创建,并且具有设置为 Thread.NORM_PRIORITY 中较小者的优先级以及线程组中允许的最大优先级。新线程具有可通过 pool-N-thread-M 的 Thread.getName()
来访问的名称,其中 N 是此工厂的序列号, M 是此工厂所创建线程的序列号。
1 | /** 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限 */ |
此工厂创建具有与 defaultThreadFactory()
相同设置的线程,新线程的 AccessControlContext
和 contextClassLoader
的其他设置与调用此 privilegedThreadFactory 方法的线程相同,新线程与当前线程具有相同的权限。可以在 AccessController.doPrivileged(java.security.PrivilegedAction )
操作中创建一个新 privilegedThreadFactory,设置当前线程的访问控制上下文,以便创建具有该操作中保持的所选权限的线程。
创建 Callable 对象
1 | /** 返回 Callable 对象,调用它时可运行给定的任务并返回 null */ |
内部类
Callable相关
RunnableAdapter
1 | //实现了Callable接口,可运行给定的任务并返回给定的结果 |
PrivilegedCallable
1 | //实现了Callable接口 |
PrivilegedCallableUsingCurrentClassLoader
1 | //实现了Callable接口 |
线程工厂类
DefaultThreadFactory
1 | //实现ThreadFactory接口(创建默认线程工厂) |
PrivilegedThreadFactory
1 | //继承DefaultThreadFactory,增加成员变量,重写newThread方法 |
包装类
DelegatedExecutorService
1 | //继承AbstractExecutorService抽象类 |
FinalizableDelegatedExecutorService
1 | //继承DelegatedExecutorService类,重写了finalize()回收方法 |
DelegatedScheduledExecutorService
1 | //在DelegatedExecutorService的基础上,增加了对ScheduledExecutorService接口的实现 |
总结
通过 Executors
提供四种常用线程池:newFixedThreadPool
、newCachedThreadPool
、newSingleThreadExecutor
、newScheduledThreadPool
。
1 | // 创建固定数目线程的线程池。 |
newFixedThreadPool 创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,提交的任务则进入队列等待,等着有闲置的线程来执行这些任务。它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
1 | ExecutorService executorService = Executors.newFixedThreadPool(5); |
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
- 工作线程的创建数量最大为 Interger. MAX_VALUE, 这样可灵活的往线程池中添加线程。
- 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
- 如果存在某一线程持续一段时间没有工作(默认为1分钟),则该线程就会销毁回收。
1 | ExecutorService executorService = Executors.newCachedThreadPool(); |
*newSingleThreadExecutor *创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。特点:有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则。
1 | ExecutorService executorService = Executors.newSingleThreadExecutor(); |
newScheduledThreadPool创建一个定长的线程池,而且支持定时或周期性的任务执行。主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
1 | ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); |
1 | //延迟3秒后执行任务,从开始执行任务开始计时,每7秒执行一次不管执行任务需要多长的时间 |
线程池不推荐使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式去创建,这样的处理方式让线程池的创建者更加明确线程池的运行规则,规避资源耗尽的风险。