Fork me on GitHub

Java线程总结(十三)

Java 线程总结(十三) —— 线程池

1、Java 并发包中的任务执行服务的主要实现机制是线程池。线程池,顾名思义,就是一个线程的池子,里面有若干线程,它们的目的就是执行提交给线程池的任务,执行完一个任务后不会退出,而是继续等待或执行新任务。线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程,工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。

线程池的优点:

  • 它可以重用线程,避免线程创建的开销。
  • 在任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。

Java 并发包中线程池的实现类是 ThreadPoolExecutor,它继承自 AbstractExecutorService,实现了 ExecutorService。

2、ThreadPoolExecutor 构造方法中的参数解析

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

线程池的大小主要与四个参数有关:

  • corePoolSize:线程池中核心线程个数。不过,这并不是说,一开始就创建这么多线程,刚创建一个线程池后,实际上并不会创建任何线程。一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSiz,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于 corePoolSiz,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是「尝试」排队,而不是「阻塞等待」入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了 maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到 maximumPoolSize。

  • maximumPoolSize:线程池中允许的最大线程数。

  • keepAliveTime:当实际线程数大于核心线程数时,此为终止前,多余的空闲线程,等待新任务的最长时间。keepAliveTime 的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于 corePoolSize 时,额外空闲线程的存活时间,也就是说,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即 keepAliveTime,如果到了时间还是没有新任务,就会被终止。如果该值为 0,表示所有线程都不会超时终止。

  • unit:keepAliveTime 参数的时间单位。

3、队列之线程参数「BlockingQueue workQueue」
ThreadPoolExecutor 要求的队列类型是阻塞队列 BlockingQueue:

  • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
  • ArrayBlockingQueue:基于数组的有界阻塞队列。
  • PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
  • SynchronousQueue:没有实际存储空间的同步阻塞队列。

如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize,到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。

对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到 maximumPoolSize。

4、任务拒绝策略之线程参数「RejectedExecutionHandler handler」
如果队列有界,且 maximumPoolSize 有限,则当队列排满,线程个数也达到了 maximumPoolSize,这时,新任务来了,就会触发线程池的任务拒绝策略。默认情况下,提交任务的方法如 execute/submit/invokeAll 等会抛出异常,类型为 RejectedExecutionException。拒绝策略是可以自定义的,ThreadPoolExecutor 实现了四种处理方式:

  1. ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常。
  2. ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛异常,也不执行。
  3. ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
  4. ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。

它们都是 ThreadPoolExecutor 的 public 静态内部类,都实现了 RejectedExecutionHandler 接口,RejectedExecutionHandler 接口定义为:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

当线程池不能接受任务时,调用其拒绝策略的 rejectedExecution 方法。

ThreadPoolExecutor 中的拒绝策略可以在构造方法中进行指定,也可以通过如下方法进行指定:public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

默认的 RejectedExecutionHandler 是一个 AbortPolicy 实例 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); 而 AbortPolicy 的 rejectedExecution 实现就是抛出异常。

拒绝策略只有在队列有界,且 maximumPoolSize 有限的情况下才会触发。 如果队列无界,服务不了的任务总是会排队,请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。如果队列有界但 maximumPoolSize 无限,可能会创建过多的线程,占满 CPU 和内存,使得任何任务都难以完成。 在任务量非常大的场景中,让拒绝策略有机会执行是保证系统稳定运行很重要的方面。

5、线程工厂之线程参数「ThreadFactory threadFactory」
线程池还可以接受一个参数 ThreadFactory,它是一个接口,定义为:

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

ThreadFactory 接口根据 Runnable 创建一个 Thread,ThreadPoolExecutor 的默认实现是 Executors 类中的静态内部类 DefaultThreadFactory,主要就是创建一个线程,给线程设置一个名称,设置 daemon 属性为 false,设置线程优先级为标准默认优先级,线程名称的格式为: pool-<线程池编号>-thread-<线程编号>。

6、关于核心线程的特殊配置
线程个数小于等于 corePoolSize 时,我们称这些线程为核心线程,默认情况下:

  • 核心线程不会预先创建,只有当有任务时才会创建。
  • 核心线程不会因为空闲而被终止,keepAliveTime参数不适用于它。

ThreadPoolExecutor 有如下方法,可以改变这个默认行为:

1
2
3
4
5
6
// 预先创建所有的核心线程
public int prestartAllCoreThreads()
// 创建一个核心线程,如果所有核心线程都已创建,返回 false
public boolean prestartCoreThread()
// 如果参数为 true,则 keepAliveTime 参数也适用于核心线程
public void allowCoreThreadTimeOut(boolean value)

7、工厂类 Executors
类 Executors 提供了一些静态工厂方法,可以方便的创建一些预配置的线程池,主要方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// newSingleThreadExecutor 只使用一个线程,使用无界队列 LinkedBlockingQueue,线程创建后不会超时终止,该线程顺序执行所有任务。该线程池适用于需要确保所有任务被顺序执行的场合
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
----------------------------------------------------------------------
// newFixedThreadPool 使用固定数目的 n 个线程,使用无界队列 LinkedBlockingQueue,线程创建后不会超时终止。和 newSingleThreadExecutor 一样,由于是无界队列,如果排队任务过多,可能会消耗非常大的内存
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
---------------------------------------------------------------------
// newCachedThreadPool 的 corePoolSize 为 0,maximumPoolSize 为 Integer.MAX_VALUE,keepAliveTime 是 60 秒,队列为 SynchronousQueue
// 当新任务到来时,如果正好有空闲线程在等待任务,则其中一个空闲线程接受该任务,否则就总是创建一个新线程,创建的总线程个数不受限制,对任一空闲线程,如果60秒内没有新任务,就终止
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

实际中,应该使用 newFixedThreadPool 还是 newCachedThreadPool 呢?
在系统负载很高的情况下,newFixedThreadPool 可以通过队列使新任务排队,保证有足够的资源处理实际的任务,而 newCachedThreadPool 会为每个任务创建一个线程,导致创建过多的线程竞争 CPU 和内存资源,使得任何实际任务都难以完成,这时,newFixedThreadPool 更为适用。

如果系统负载不太高,单个任务的执行时间也比较短,newCachedThreadPool 的效率可能更高,因为任务可以不经排队,直接交给某一个空闲线程。

在系统负载可能极高的情况下,两者都不是好的选择,newFixedThreadPool 的问题是队列过长,而 newCachedThreadPool 的问题是线程过多,这时,应根据具体情况自定义 ThreadPoolExecutor,传递合适的参数。

8、线程的死锁
关于提交给线程池的任务,需要特别注意一种情况,就是任务之间有依赖,这种情况可能会出现死锁。对于相互依赖的任务,需要特别注意,避免出现死锁。

避免死锁的策略:

  • 替换 newFixedThreadPool 为 newCachedThreadPool,让创建线程不再受限。
  • 使用 SynchronousQueue,它可以避免死锁。因为对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue 来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

9、ThreadPoolExecutor 实现了生产者/消费者模式,工作者线程就是消费者,任务提交者就是生产者,线程池自己维护任务队列。当我们碰到类似生产者/消费者问题时,应该优先考虑直接使用线程池,而非重新发明轮子,自己管理和维护消费者线程及任务队列。

10、小结:ThreadPoolExecutor 参数 corePoolSize, maximumPoolSize, keepAliveTime, unit 用于控制线程池中线程的个数,workQueue 表示任务队列,threadFactory 用于对创建的线程进行一些配置,handler 表示任务拒绝策略。

参考博客

Java编程的逻辑 - 线程池

-------------本文结束感谢您的阅读-------------

本文标题:Java线程总结(十三)

文章作者:Yan ChongSheng

发布时间:2018年08月27日

最后更新:2018年08月27日

原始链接:yanchongsheng.github.io/2018/08/27/Java-Java-Thread-2018-08-27-Java线程总结-十三/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

开启打赏模式