The Fork/Join framework makes it easy for Java developers to take advantage of the parallel execution supported by multicore processors. This API was added to the java.util.concurrent package since JDK 7, and it has been improved in subsequent versions of the JDK. It allows the developer to forget about the heavy-duty tasks associated with the manual handling of threads. This way, threads are created and managed automatically while taking advantage of the computational power or multiple processors to boost performance. This post introduces the key aspects of this framework from a practical perspective.

A decorated tree illuminating the night in Stockholm (not a tree of threads)
© A decorated tree illuminating the night in Stockholm (isn't a tree of threads)

Concurrency vs. Parallelism

First, let’s clarify the between concurrent and parallel executions in the context of a Java application.

  • Concurrent execution: All the threads are executed on the same CPU. This means that the application does not make progress on more than one task at the same time, i.e., the tasks are executed in interleave fashion, sharing CPU processing time during execution.
  • Parallel execution: Each thread is executed in a separate CPU core. Therefore, tasks make progress independently. This allows the execution to make progress on more than one task simultaneously.

To achieve parallelism, the Java application must have more than one thread running, and each thread must run on separate CPU, CPU cores, GPUs, or similar.

The Fork/Join framework

The Fork/Join framework provides APIs that facilitate Java developers to solve a parallelizable task in a recursive manner efficiently. This method is actually a parallel version of the “divide and conquer” algorithm. With this approach, the problem is split into smaller independent parts, which in turn, the small part is split further until each part can be solved directly (this is called fork). Then, all parts are executed and solved in parallel. Once completed, all the results are joined together to produce the final result. The following pseudocode describes the process:

1
2
3
4
5
6
7
if (my portion of the work is small enough)
  solve the problem directly (sequential algorithm)
else
  split the problem into independent parts
  fork new sub-tasks to solve each part
  join all sub-tasks
  compose the result from sub-results

You need to know about four classes to use the framework:

  • ForkJoinPool: An executor dedicated to run instances implementing ForkJoinTask<V>. Implements the Work Stealing Algorithm to balance the load among threads: if a worker thread runs out of things to do, it can steal tasks from other threads that are still busy.
  • ForkJoinTask<V>: An abstract class that defines a task that runs within a ForkJoinPool.
  • RecursiveAction: A ForkJoinTask subclass for tasks that don’t return values.
  • RecursiveTask<V>: A ForkJoinTask subclass for tasks that return values.

In practice, all you need to do is create a subclass of RecursiveAction or RecursiveTask<V>, and override the abstract method compute(). Then, submit the task to be executed by the ForkJoinPool, which handles everything from threads management to utilization of multicore processor. Note that RecursiveAction do not return anything, while the RecursiveTask<V> can return an object of a specified type.

If you are curious, here is the class diagram for this architecture. As you observe, the ForkJoinPool class is an implementation of the AbstractExecutorService class which implements ExecutorService, while the ForkJoinTask<V> abstract class implements the Future<V> interface.

Fork/Join Class Diagram

Examples

Let’s see two examples that illustrate how you can use the Fork/Join framework.

RecursiveAction

Task: You want to replace the value of all the entries of a particular element in an array of integers. There may be more than one entry of the same element in the array.

This task is suitable for implementing in parallel because the array can be split to replace the element in each part independently. The following ForkJoinReplaceAction class extends the RecursiveAction class to solve this problem. The void method compute() in charge of the performing the tasks in parallel. It calls the process() method that replaces the element in the array when the split part is smaller than the specified THRESHOLD. Otherwise, it calls the static invokeAll method in the ForkJoinTask class with a list of instances of the ForkJoinReplaceAction in which the array is divided in half.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ForkJoinReplaceAction extends RecursiveAction {
  private final int element;
  private final int[] array;
  private final int THRESHOLD;
  private final int ini;
  private final int end;

  // AllArgs Constructor

  @Override
  protected void compute() {
    if (end - ini <= THRESHOLD) {
      process(element, array, ini, end);
    } else {
      ForkJoinTask.invokeAll(createSubTasks());
    }
  }

  private List<ForkJoinReplaceAction> createSubTasks() {
    List<ForkJoinReplaceAction> taskList = new ArrayList<>();
    taskList.add(
        new ForkJoinReplaceAction(element, array, THRESHOLD, ini, (ini + end) / 2)
    );
    taskList.add(
        new ForkJoinReplaceAction(element, array, THRESHOLD, (ini + end) / 2, end)
    );
    return taskList;
  }
  
  private void process(int element, int[] array, int ini, int end) {
    for (int i = ini; i < end; i++) {
      if (array[i] == element) {
        array[i] = -1;
      }
    }
  }
}

To execute the ForkJoinReplaceAction, create an instance of ForkJoinPool and use the invoke() method. By using its default constructor, you configure the pool to use all the processors available to the JVM (i.e., the value returned by Runtime.availableProcessors). The following code prints 1 2 -1 -1 -1 4 5 6 6 7, because each entry of the value 3 is replaced by -1 in the array:

1
2
3
4
ForkJoinPool pool = new ForkJoinPool();
int[] arr = new int[]{1, 2, 3, 3, 3, 4, 5, 6, 6, 7};
pool.invoke(new ForkJoinReplaceAction(3, arr, 2, 0, arr.length));
Arrays.stream(arr).forEach(value -> System.out.print(value + " "));

RecursiveTask

Task: You want to count all the entries of a particular element in an array of integers.

As in the previous examples, this task is suitable for implementing in parallel. The class ForkJoinCountTask extends the RecursiveTask<Integer> class to solve this problem. The compute() return the solution. If the split part is smaller than the specified THRESHOLD, it calls the process() method that counts the number of times the element is present in the smaller array (the array is divided in a half each time). Otherwise, it calls the static invokeAll method in the ForkJoinTask class with a list of instances of the ForkJoinCountTask, obtained from the createSubTasks() method. Note that the result joined together and summed using the Java Stream API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ForkJoinCountTask extends RecursiveTask<Integer> {
  private final int element;
  private final int[] array;
  private final int THRESHOLD;
  private final int ini;
  private final int end;

  // AllArgs Constructor
  
  @Override
  protected Integer compute() {
    if (end - ini <= THRESHOLD) {
      return process(element, array, ini, end);
    } else {
    return ForkJoinTask.invokeAll(createSubTasks())
      .stream()
      .mapToInt(ForkJoinTask::join)
      .sum();
    }
  }
  
  private List<ForkJoinCountTask> createSubTasks() {
    List<ForkJoinCountTask> taskList = new ArrayList<>();
    taskList.add(new ForkJoinCountTask(element, array, THRESHOLD, ini, (ini + end) / 2));
    taskList.add(new ForkJoinCountTask(element, array, THRESHOLD, (ini + end) / 2, end));
    return taskList;
  }
  
  private Integer process(int element, int[] array, int ini, int end) {
    int count = 0;
    for (int i = ini; i < end; i++) {
      if (array[i] == element) {
        count++;
      }
    }
    return count;
  }
}

As in the previous example, to execute the ForkJoinCountTask, create an instance of ForkJoinPool and use the invoke() method. The following code prints 3, because there are three entries of this element in the array:

1
2
3
var pool = new ForkJoinPool();
int[] arr = new int[]{1, 2, 3, 3, 3, 4, 5, 6, 6, 7};
System.out.println(pool.invoke(new ForkJoinCountTask(3, arr, 2, 0, arr.length)));

Conclusion

The Fork/Join framework simplifies parallel programming for Java developers. The ForkJoinPool allows executing ForkJoinTasks threads running on a separate processing core, where threads that finish early can steal tasks from other threads that are still busy. It’s worth mentioning that debugging parallel implementations can be tricky. You never take for granted that a computation using the Fork/Join framework is faster than a sequential variant. For this reason, you should always measure and compare the performance empirically. Furthermore, it would be best to experimentally choose the threshold used to split the task into smaller subtasks.

References