今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

ForkJoinTask

概述

ForkJoinTask是一个在ForkJoinPool中运行的任务的抽象基类。ForkJoinTask是一种类似线程的实体,比普通线程轻得多。在ForkJoinPool中,大量的任务和子任务可以由少量的实际线程托管,以牺牲一些使用限制为代价。

一个"主"ForkJoinTask在显式提交到ForkJoinPool时开始执行,或者如果还没有参与ForkJoin计算,则通过fork、invoke或相关方法在ForkJoinPool.commonPool()中启动。一旦开始,它通常会继续开始其他子任务。如该类名称所示,许多使用ForkJoinTask的程序仅使用fork和join或其衍生方法(例如invokeAll)。然而,这个类也提供了许多其他方法,在高级用法中可能会涉及,以及扩展机制,支持新形式的分治/合并处理。

ForkJoinTask是Future的轻量级形式。ForkJoinTask的效率源于一组限制(只是部分静态可强制执行的),反映了它们作为计算纯函数或在纯独立对象上操作的计算任务的主要用途。主要的协调机制是fork,安排异步执行,和join,在任务的结果计算完成之前不会继续。计算理论上应该避免使用同步方法或块,并应尽量减少其他阻塞同步,除了加入其他任务或使用诸如Phaser之类的同步器,它们被宣布与分治/合并调度合作。可分割的任务也不应执行阻塞I/O,并且理想情况下应访问与其他运行任务完全独立的变量。这些指导方针通过不允许抛出诸如IOException之类的受检异常而得到松散执行。然而,计算仍然可能遇到未经检查的异常,这些异常会重新抛回试图加入它们的调用者。这些异常还可能包括来自内部资源耗尽的RejectedExecutionException,例如分配内部任务队列失败。重新抛出的异常的行为与常规异常相同,但是,如果可能,它们包含堆栈跟踪(例如使用ex.printStackTrace()显示)

可以定义和使用可能阻塞的 ForkJoinTasks,但这需要考虑三个方面:(1) 对于阻塞于外部同步或 I/O 的任务,很少有其他任务的完成依赖于它。从不进行联合的事件风格异步任务(例如,那些子类化 CountedCompleter)通常属于这一类。(2) 为了最大限度地减小资源影响,任务应该小巧;理想情况下仅执行(可能)阻塞操作。(3) 除非使用 ForkJoinPool.ManagedBlocker API,或者知道可能阻塞的任务数量小于池的 ForkJoinPool.getParallelism 级别,否则池不能保证有足够的线程以确保进展或良好的性能。 等待任务完成并提取结果的主要方法是 join,但有几种变体:Future.get 方法支持可中断和/或定时等待完成,并使用 Future 约定报告结果。invoke 方法在语义上等同于 fork(); join(),但始终尝试在当前线程中开始执行。这些方法的“安静”形式不提取结果或报告异常。当一组任务正在执行时,可能需要延迟处理结果或异常,直到全部完成,这些形式可能很有用。方法invokeAll(有多个版本)执行最常见的并行调用形式:分叉一组任务并将它们全部合并。

在最常见的使用方式中,fork-join 对就像并行递归函数的调用(fork)和返回(join)。与其他形式的递归调用一样,应该先执行内部的返回(join)。例如,a.fork(); b.fork(); b.join(); a.join(); 比先合并 a 再合并 b 更有效率。 任务的执行状态可以在多个细节级别查询:isDone 为 true,如果任务以任何方式完成(包括任务在没有执行的情况下被取消的情况);isCompletedNormally 为 true,如果任务在没有取消或遇到异常的情况下完成;isCancelled 为 true,如果任务被取消(此时 getException 返回一个 CancellationException);isCompletedAbnormally 为 true,如果任务被取消或遇到异常,此时 getException 将返回遇到的异常或 CancellationException。

ForkJoinTask 类通常不直接子类化。相反,您子类化支持特定形式的 fork/join 处理的抽象类之一,通常是大多数不返回结果的计算的 RecursiveAction,返回结果的 RecursiveTask,以及完成的操作触发其他操作的 CountedCompleter。通常,具体的 ForkJoinTask 子类声明了组成其参数的字段,在构造函数中建立,然后定义一个使用此基类提供的控制方法以某种方式使用参数的 compute 方法。

method join 和它的变体仅在完成依赖关系是非环的情况下适用,也就是说,并行计算可以描述为有向无环图(DAG)。否则,执行可能会遇到某种形式的死锁,因为任务互相等待。然而,这个框架支持其他方法和技术(例如使用 Phaser、helpQuiesce 和 complete),这些方法可用于构造特殊子类以解决不是静态结构为 DAG 的问题。为了支持这样的用法,可以使用 setForkJoinTaskTag 或 compareAndSetForkJoinTaskTag 以短整数的值原子地对 ForkJoinTask 进行标记,并使用 getForkJoinTaskTag 进行检查。ForkJoinTask 实现不使用这些受保护的方法或标记,但它们可用于构造专门的子类。例如,并行图遍历可以使用提供的方法来避免重新访问已处理过的节点/任务。(标记的方法名称繁琐,部分是为了鼓励定义反映其使用模式的方法。)

大多数基础支持方法是final的,以防止覆盖与基础轻量级任务调度框架内在相关的实现。开发人员创建新的fork / join处理基本样式时,应至少实现protected方法exec、setRawResult和getRawResult,同时引入一种可在其子类中实现的抽象计算方法,可能依赖于该类提供的其他受保护的方法。

ForkJoinTask应该执行相对较小的计算量。大任务应该分成较小的子任务,通常通过递归分解。作为一个非常粗略的经验法则,一个任务应该执行100个以上、10000个以下的基本计算步骤,并应避免无限循环。如果任务太大,并行不能提高吞吐量。如果太小,那么内存和内部任务维护开销可能会压倒处理。

此类为Runnable和Callable提供了adapt方法,在将ForkJoinTask与其他类型的任务混合执行时可能很有用。当所有任务都是这种形式时,请考虑使用在asyncMode中构造的池。

ForkJoinTask是Serializable,这使它们可以在扩展中使用,例如远程执行框架。在执行前或执行后序列化任务是明智的,但不是在执行期间。序列化不在执行期间依赖于此。

源码解析

ForkJoinTask 类的内部文档中有一个对 class ForkJoinPool 的概述性实现。ForkJoinTasks 主要负责在向 ForkJoinWorkerThread 和 ForkJoinPool 中的方法中进行转发时维护它们的“状态”字段。

该类的方法在某种程度上层次分明: (1) 基本状态维护 (2) 执行和等待完成 (3) 还报告结果的用户级方法。 因为本文件中导出的方法按 javadocs 中的方式排列,所以这有时很难看出来。

修订说明:“Aux”字段的使用取代了以前对一个表保存异常,以及同步块和监视器等待完成的依赖。

节点用于等待完成的线程或持有抛出的异常(从不两者兼有)。等待线程以Treiber堆栈风格的方式在前面预留节点。信号者分离和释放等待者。取消的等待者试图取消连接。

示例

public class SumTask extends RecursiveTask<Long> {
    private long[] numbers;
    private int from;
    private int to;

    public SumTask(long[] numbers, int from, int to) {
        this.numbers = numbers;
        this.from = from;
        this.to = to;
    }

    /**
     * @return
     */
    @Override
    protected Long compute() {
        if (to - from < 10) { 
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        } else { // Otherwise continue splitting, call recursively
            int middle = (from + to) / 2;
            SumTask taskLeft = new SumTask(numbers, from, middle);
            SumTask taskRight = new SumTask(numbers, middle + 1, to);
            taskLeft.fork();
            taskRight.fork();
            return taskLeft.join() + taskRight.join();
        }
    }
}

  @Test
    public void testSumTask(){
      ForkJoinPool forkJoinPool = new ForkJoinPool();
      long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();
      // Here you can call the future returned by the submit method byFuture.getGet results
      Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));
      System.out.println("Number of active threads:"+forkJoinPool.getActiveThreadCount());
      forkJoinPool.shutdown();
      System.out.println("The end result:"+result);
      System.out.println("Tasks stolen:"+forkJoinPool.getStealCount());
  }

 

分享到:

专栏

类型标签

网站访问总量