Java 线程总结(十二) —— 异步任务执行服务
1、Java 并发包提供了一套框架,大大简化了执行异步任务所需的开发。在之前的学习中,线程 Thread 既表示要执行的任务,又表示执行的机制,而这套框架引入了一个「执行服务」的概念,它将「任务的提交」和「任务的执行」相分离,「执行服务」封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务、获取结果、取消任务,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。
2、任务执行服务涉及的基本接口:
- Runnable 和 Callable:表示要执行的异步任务。
- Executor 和 ExecutorService:表示执行服务。
- Future:表示异步任务的结果。
Runnable 和 Callable 都表示任务,Runnable 没有返回结果,而 Callable 有,Runnable 不会抛出异常,而 Callable 会。
Executor 表示最简单的执行服务,其定义为:1
2
3public interface Executor {
void execute(Runnable command);
}
在未来某个时间执行给定的任务 Runnable,没有返回结果。该任务可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。
ExecutorService 扩展了 Executor,定义了更多服务,基本方法有:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public interface ExecutorService extends Executor {
// 这三个 submit 都表示提交一个任务,返回值类型都是 Future,返回后,只是表示任务已提交,不代表已执行,通过 Future 可以查询异步任务的状态、获取最终结果、取消任务等
// 对于 Callable,任务最终有个返回值,而对于 Runnable 是没有返回值的
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 的方法可以同时提供一个结果,在异步任务结束时返回
<T> Future<T> submit(Runnable task, T result);
// 异步任务的最终返回值为 null
Future<?> submit(Runnable task);
// shutdown 和 shutdownNow 表示关闭任务执行服务
// shutdown 表示不再接受新任务,但已提交的任务会继续执行,即使任务还未开始执行
void shutdown();
// shutdownNow 不仅不接受新任务,已提交但尚未执行的任务会被终止,对于正在执行的任务,一般会调用线程的 interrupt 方法尝试中断,不过,线程可能不响应中断,shutdownNow 会返回已提交但尚未执行的任务列表
List<Runnable> shutdownNow();
// shutdown 和 shutdownNow 不会阻塞等待,它们返回后不代表所有任务都已结束,不过 isShutdown 方法会返回 true
boolean isShutdown();
boolean isTerminated();
// 调用者可以通过 awaitTermination 等待所有任务结束,它可以限定等待的时间,如果超时前所有任务都结束了,即 isTerminated 方法返回 true,则返回 true,否则返回 false
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// ExecutorService 有两组批量提交任务的方法,invokeAll 和 invokeAny,它们都有两个版本,其中一个限定等待时间
// invokeAll 等待所有任务完成,返回的 Future 列表中,每个 Future 的 isDone 方法都返回 true,不过 isDone 为 true 不代表任务就执行成功了,可能是被取消了,invokeAll 可以指定等待时间,如果超时后有的任务没完成,就会被取消
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// invokeAny 只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消,如果没有任务能在限时内成功返回,抛出 TimeoutException,如果限时内所有任务都结束了,但都发生了异常,抛出 ExecutionException
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
使用 ExecutorService 编写并发异步任务的代码就像写顺序程序一样,不用关心线程的创建和协调,只需要提交任务、处理结果就可以了,大大简化了开发工作。
3、Future 接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// Future 表示异步计算的结果
public interface Future<V> {
// cancel 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消,cancel 返回 false,否则返回 true
// 如果任务还未开始或已经结束,则不再运行
// 如果任务已经在运行,则不一定能取消,参数 mayInterruptIfRunning 表示,如果任务正在执行,是否调用 interrupt 方法中断线程,如果为 false 就不会,如果为 true,就会尝试中断线程,interrupt() 不一定能取消线程
boolean cancel(boolean mayInterruptIfRunning);
// isDone 和 isCancelled 用于查询任务状态
// isCancelled 表示任务是否被取消,只要 cancel 方法返回了 true,随后的 isCancelled 方法都会返回 true,即使执行任务的线程还未真正结束
boolean isCancelled();
// isDone 表示任务是否结束,不管什么原因都算,可能是任务正常结束、可能是任务抛出了异常、也可能是任务被取消
boolean isDone();
// get 用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待
V get() throws InterruptedException, ExecutionException;
// 可以限定阻塞等待的时间,如果超时任务还未结束,会抛出 TimeoutException
V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException, TimeoutException;
}
get() 方法,任务最终大概有三个结果:
- 正常完成,get 方法会返回其执行结果,如果任务是 Runnable 且没有提供结果,返回 null。
- 任务执行抛出了异常,get 方法会将异常包装为 ExecutionException 重新抛出,通过异常的 getCause 方法可以获取原异常。
- 任务被取消了,get 方法会抛出异常 CancellationException。
如果调用 get 方法的线程被中断了,get 方法会抛出 InterruptedException。
Future 是一个重要的概念,是实现「任务的提交」与「任务的执行」相分离的关键,是其中的纽带,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。
4、ExecutorService 的主要实现类是 ThreadPoolExecutor,它是基于线程池实现的。ExecutorService 有一个抽象实现类 AbstractExecutorService,接下来简要分析下 AbstractExecutorService 的原理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// AbstractExecutorService 提供了 submit、invokeAll、invokeAny 的默认实现,子类只需要实现如下方法即可
public abstract class AbstractExecutorService implements ExecutorService {
// 除了 execute 方法,其他方法都与执行服务的生命周期管理有关
public void shutdown()
public List<Runnable> shutdownNow()
public boolean isShutdown()
public boolean isTerminated()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
// submit/invokeAll/invokeAny 最终都会调用 execute,execute 决定了到底如何执行任务
public void execute(Runnable command)
}
5、AbstractExecutorService 抽象类中 submit() 方法源码解析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 调用 newTaskFor 生成了一个 RunnableFuture,RunnableFuture 是一个接口,既扩展了 Runnable,又扩展了 Future,没有定义新方法
// 作为 Runnable,它表示要执行的任务,传递给 execute 方法进行执行
// 作为 Future,它又表示任务执行的异步结果
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
----------------------
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 就是创建了一个 FutureTask 对象,FutureTask 实现了 RunnableFuture 接口
return new FutureTask<T>(runnable, value);
}
----------------------
public FutureTask(Runnable runnable, V result) {
// 如果 FutureTask 接受的是一个 Runnable 对象,它会调用 Executors.callable 转换为 Callable 对象
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
---------------------
// RunnableFuture 接口定义
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
6、FutureTask 源码解析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// Future 的主要实现类是 FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
// 表示状态,可取值如下
private volatile int state;
private static final int NEW = 0;// 刚开始的状态,或任务在运行
private static final int COMPLETING = 1;// 临时状态,任务即将结束,在设置结果
private static final int NORMAL = 2;// 任务正常执行完成
private static final int EXCEPTIONAL = 3;// 任务执行抛出异常结束
private static final int CANCELLED = 4;// 任务被取消
private static final int INTERRUPTING = 5;// 任务在被中断
private static final int INTERRUPTED = 6;// 任务被中断
// 表示待执行的任务
private Callable<V> callable;
// 表示最终的执行结果或异常
private Object outcome; // non-volatile, protected by state reads/writes
// 表示运行任务的线程
private volatile Thread runner;
// 单向链表表示等待任务执行结果的线程
private volatile WaitNode waiters;
// FutureTask 的构造方法会初始化 callable 和状态,如果 FutureTask 接受的是一个 Runnable 对象,它会调用 Executors.callable 转换为 Callable 对象
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// 任务执行服务会使用一个线程执行 FutureTask 的 run 方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
FutureTask 的 run() 方法的基本逻辑为:
- 调用 callable 的 call 方法,捕获任何异常。
- 如果正常执行完成,调用 set 设置结果,保存到 outcome。
- 如果执行过程发生异常,调用 setException 设置异常,异常也是保存到 outcome,但状态不一样。
- set 和 setException 除了设置结果,修改状态外,还会调用 finishCompletion ,它会唤醒所有等待结果的线程。
7、FutureTask 中 get() 方法源码解析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果任务还未执行完毕,就等待
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 最后调用 report 报告结果, report 根据状态返回结果或抛出异常
return report(s);
}
-----------------------------------
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
8、FutureTask 中 cancel() 方法源码解析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public boolean cancel(boolean mayInterruptIfRunning) {
// 如果任务已结束或取消,返回 false
// 如果 mayInterruptIfRunning 为 false,设置状态为 CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果 mayInterruptIfRunning 为 true,调用 interrupt 中断线程,设置状态为 INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 调用 finishCompletion 唤醒所有等待结果的线程
finishCompletion();
}
return true;
}
9、Java 并发包中任务执行服务的基本概念和原理,该服务体现了并发异步开发中「关注点分离」的思想,使用者只需要通过 ExecutorService 提交任务,通过 Future 操作任务和结果即可,不需要关注线程创建和协调的细节。