Fork me on GitHub

Java 线程总结(十四)

Java 线程总结(十四) —— CompletionService

1、在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java 并发包提供了一个方便的方法,使用 CompletionService,这是一个接口,它的实现类是 ExecutorCompletionService。

2、与 ExecutorService 一样,CompletionService 也可以提交异步任务,它的不同是,它可以按任务完成顺序获取结果,其具体定义为:

1
2
3
4
5
6
7
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

submit 方法与 ExecutorService 是一样的,多了 take 和 poll 方法,它们都是获取下一个完成任务的结果,take() 会阻塞等待,poll() 会立即返回,如果没有已完成的任务,返回 null,带时间参数的 poll 方法会最多等待限定的时间。

2、CompletionService 的主要实现类是 ExecutorCompletionService,它依赖于一个 Executor 完成实际的任务提交,而自己主要负责结果的排队和处理。它的构造方法有两个:

1
2
public ExecutorCompletionService(Executor executor)
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

至少需要一个 Executor 参数,可以提供一个 BlockingQueue 参数,用作完成任务的队列,没有提供的话,ExecutorCompletionService 内部会创建一个 LinkedBlockingQueue。

3、ExecutorCompletionService 是怎么让结果有序处理的呢?
因为它有一个额外的队列,每个任务完成之后,都会将代表结果的 Future 入队。在 FutureTask 中,任务完成后,不管是正常完成、异常结束、还是被取消,都会调用 finishCompletion 方法,而该方法会调用一个 done 方法 protected void done() { } 该方法的实现为空,但它是一个 protected 方法,子类可以重写该方法。ExecutorCompletionService 的内部类 QueueingFuture 中重写了该方法。

在 ExecutorCompletionService 中,提交的任务类型不是一般的 FutureTask,而是一个子类 QueueingFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 注意 QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
----------------------------
// ExecutorCompletionService 类中的私有实例变量
private final BlockingQueue<Future<V>> completionQueue;

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 子类重写了 done 方法,在任务完成时将结果加入到完成队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
---------------------------

而 ExecutorCompletionService 的 take/poll 方法就是从该队列获取结果:

1
2
3
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

4、AbstractExecutorService 的 invokeAny 的实现,就利用了 ExecutorCompletionService,它的基本思路是,提交任务后,通过 take 方法获取结果,获取到第一个有效结果后,取消所有其他任务。

5、CompletionService 它通过一个额外的结果队列,方便了对于多个异步任务结果的处理。

参考博客

Java编程的逻辑 - 方便的 CompletionService

-------------本文结束感谢您的阅读-------------

本文标题:Java 线程总结(十四)

文章作者:Yan ChongSheng

发布时间:2018年08月28日

最后更新:2018年08月28日

原始链接:yanchongsheng.github.io/2018/08/28/Java-Java-Thread-2018-08-28-Java线程总结-十四/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

开启打赏模式