Jdk1.6 JUC源码解析(21)-ExecutorCompletionService
作者:大飞
功能简介:
- ExecutorCompletionService用于执行一批任务,然后按照任务执行完成的顺序来获取任务结果。你甚至可以在获取到了若干个执行结果后,把其他的任务取消掉(ThreadPoolExecutor中的invokeAny就是通过这货实现的)。比如这样的场景:你的业务需要调用10个接口来获取一些信息,业务规定只需要其中任意2个接口的信息,那么就可以使用ExecutorCompletionService,获取前两个成功完成的任务结果,然后将其他的任务取消。
源码分析:
- ExecutorCompletionService实现了CompletionService接口,先看下这个接口:
public interface CompletionService<V> { /** * 提交一个有返回值的任务。 * 一旦这个任务完成,就能通过taken或者poll方法来获取任务结果。 */ Future<V> submit(Callable<V> task); /** * 提交一个Runnable和一个返回值。 * 一旦这个任务完成,就能通过taken或者poll方法来获取这个返回值。 */ Future<V> submit(Runnable task, V result); /** * 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待。 */ Future<V> take() throws InterruptedException; /** * 获取并移除下一个完成的任务,如果当前没有任务完成,返回null。 */ Future<V> poll(); /** * 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待, * 如果在超时前仍然没有任务完成,返回null。 */ Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
- 看下ExecutorCompletionService的代码,首先看下内部结构:
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); //异步任务完成后,将其放入完成队列} private final Future<V> task; }
通过代码可见,内部有用来实际执行任务的executor和用来存放完成任务的阻塞队列(注意到内部的QueueingFuture覆盖了FutureTask的钩子方法done,在任务完成后会将其放到完成队列里面)。
看下构造方法:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
构造方法中,当传入的executor属于AbstractExecutorService类型,会将这个executor同时赋给aes,主要用于内部包装异步任务:
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
- 最后看下功能方法:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
方法很简单,提交到ExecutorCompletionService的任务会在内部被包装成QueueingFuture,并由内部的executor来执行这个任务,当任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了。
ExecutorCompletionService的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
jdk-jdk1.6.0.24-windows-i586.exe
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
logback-cfca-jdk1.6-3.1.0.0.jar
文档介绍了JDK1.6安装及与JDK-1.5版本共存,已经在我的机子上通过了测试,按照文档操作即可。
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
JDK-1.6-Windows-32位 纯官方安装版 JDK-1.6-Windows-32位 纯官方安装版
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
Java编程开发工具包,最新版本,很好用,经典
1、okhttp3.8源码使用jdk1.6重新编译,已集成了okio,仅在javaweb项目中使用。 2、另附json-20160810.jar
jdk1.6-jdk-6u43-windows32-i586
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64