`
BrokenDreams
  • 浏览: 248706 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97810
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(21)-ExecutorCompletionService

阅读更多

Jdk1.6 JUC源码解析(21)-ExecutorCompletionService

作者:大飞

 

功能简介:
  • ExecutorCompletionService用于执行一批任务,然后按照任务执行完成的顺序来获取任务结果。你甚至可以在获取到了若干个执行结果后,把其他的任务取消掉(ThreadPoolExecutor中的invokeAny就是通过这货实现的)。比如这样的场景:你的业务需要调用10个接口来获取一些信息,业务规定只需要其中任意2个接口的信息,那么就可以使用ExecutorCompletionService,获取前两个成功完成的任务结果,然后将其他的任务取消。
源码分析:
  • ExecutorCompletionService实现了CompletionService接口,先看下这个接口:
public interface CompletionService<V> {
    /**
     * 提交一个有返回值的任务。
     * 一旦这个任务完成,就能通过taken或者poll方法来获取任务结果。
     */
    Future<V> submit(Callable<V> task);
    /**
     * 提交一个Runnable和一个返回值。
     * 一旦这个任务完成,就能通过taken或者poll方法来获取这个返回值。
     */
    Future<V> submit(Runnable task, V result);
    /**
     * 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待。
     */
    Future<V> take() throws InterruptedException;
    /**
     * 获取并移除下一个完成的任务,如果当前没有任务完成,返回null。
     */
    Future<V> poll();
    /**
     * 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待,
     * 如果在超时前仍然没有任务完成,返回null。
     */
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

 

 

  • 看下ExecutorCompletionService的代码,首先看下内部结构:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); //异步任务完成后,将其放入完成队列}
        private final Future<V> task;
    }

       通过代码可见,内部有用来实际执行任务的executor和用来存放完成任务的阻塞队列(注意到内部的QueueingFuture覆盖了FutureTask的钩子方法done,在任务完成后会将其放到完成队列里面)。

       看下构造方法:
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
 
       构造方法中,当传入的executor属于AbstractExecutorService类型,会将这个executor同时赋给aes,主要用于内部包装异步任务: 
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }
 
 
  • 最后看下功能方法:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    public Future<V> poll() {
        return completionQueue.poll();
    }
    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

       方法很简单,提交到ExecutorCompletionService的任务会在内部被包装成QueueingFuture,并由内部的executor来执行这个任务,当任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了。

 
 
       ExecutorCompletionService的代码解析完毕!
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics