ForkJoinPool
ForkJoinPool 은 Java7부터 사용가능한 Java Concurrency 툴이며, 동일한 작업을 여러개의 Sub Task로 분리(Fork)하여 각각 처리하고, 이를 최종적으로 합쳐서(Join) 결과를 만들어내는 방식이다.
ForkJoinPool에는 2가지 인터페이스를 제공한다.
- RecursiveAction :
> RecursiveAction은 결과를 반환하지 않는 태스크를 말한다. 특정 작업을 분할하고, 처리를 수행하면서 해당 작업을 내부에서 처리한다.
예제 :
package com.company.api_simulation;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class APICallRecursiveAction extends RecursiveAction {
public static final int DIVIDE_COUNT = 5;
List<RequestObject> requestObjects = null;
public APICallRecursiveAction(List<RequestObject> requestObjects) {
this.requestObjects = requestObjects;
}
@Override
protected void compute() {
if (requestObjects.size() > DIVIDE_COUNT) {
ForkJoinTask.invokeAll(divideSubActions());
}
else {
processAPICall(requestObjects);
}
}
private void processAPICall(List<RequestObject> requestObjects) {
requestObjects.forEach(System.out::println);
}
private List<APICallRecursiveAction> divideSubActions() {
List<RequestObject> preList = requestObjects.subList(0, requestObjects.size() / 2);
List<RequestObject> postList = requestObjects.subList(requestObjects.size() / 2, requestObjects.size());
List<APICallRecursiveAction> list = new ArrayList<>();
list.add(new APICallRecursiveAction(preList));
list.add(new APICallRecursiveAction(postList));
return list;
}
public static void main(String[] args) {
List<RequestObject> lists = new ArrayList<>();
lists.add(new RequestObject(1L, 1L, "First1"));
lists.add(new RequestObject(2L, 1L, "First2"));
lists.add(new RequestObject(10L, 5L, "First10"));
lists.add(new RequestObject(6L, 3L, "First6"));
lists.add(new RequestObject(3L, 2L, "First3"));
lists.add(new RequestObject(4L, 2L, "First4"));
lists.add(new RequestObject(11L, 6L, "First11"));
lists.add(new RequestObject(8L, 4L, "First8"));
lists.add(new RequestObject(5L, 3L, "First5"));
lists.add(new RequestObject(7L, 4L, "First7"));
lists.add(new RequestObject(9L, 5L, "First9"));
lists.add(new RequestObject(12L, 6L, "First12"));
APICallRecursiveAction action = new APICallRecursiveAction(lists);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
forkJoinPool.invoke(action);
}
}
- RecursiveTask :
> 기대하는 바와 같이 이 작업은 결과를 반환하는 태스크를 Fork와 Join으로 처리한다.
예제 :
package com.company;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD = 4;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
// .filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
public static void main(String[] args) {
ForkJoinPool commonPool = ForkJoinPool.commonPool();
int[] arr = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
CustomRecursiveTask task = new CustomRecursiveTask(arr);
// commonPool.execute(task);
// Integer result = task.join();
// System.out.println(result);
// System.out.println(commonPool.invoke(task));
System.out.println(task.compute());
}
}
결론적으로
Fork Join Pool은 우리가 원하는 작업을 Divided and Conquer 방식으로 처리할 수 있도록 해준다.
실제 프로그램에서 사용할때에는 외부 API 를 여러번 호출해야하는 상황에서 작업을 분리하여 호출할 수도 있을 것이다.
EmoticonEmoticon