ForkJoinPool 예제

ForkJoinPool

ForkJoinPool 은 Java7부터 사용가능한 Java Concurrency 툴이며, 동일한 작업을 여러개의 Sub Task로 분리(Fork)하여 각각 처리하고, 이를 최종적으로 합쳐서(Join) 결과를 만들어내는 방식이다.

ForkJoinPool에는 2가지 인터페이스를 제공한다. 

- RecursiveAction : 

> RecursiveAction은 결과를 반환하지 않는 태스크를 말한다. 특정 작업을 분할하고, 처리를 수행하면서 해당 작업을 내부에서 처리한다. 

예제 : 
package com.company.api_simulation;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class APICallRecursiveAction extends RecursiveAction {

    public static final int DIVIDE_COUNT = 5;

    List<RequestObject> requestObjects = null;

    public APICallRecursiveAction(List<RequestObject> requestObjects) {
        this.requestObjects = requestObjects;
    }

    @Override
    protected void compute() {
        if (requestObjects.size() > DIVIDE_COUNT) {
            ForkJoinTask.invokeAll(divideSubActions());
        }
        else {
            processAPICall(requestObjects);
        }
    }

    private void processAPICall(List<RequestObject> requestObjects) {
        requestObjects.forEach(System.out::println);
    }

    private List<APICallRecursiveAction> divideSubActions() {

        List<RequestObject> preList = requestObjects.subList(0, requestObjects.size() / 2);
        List<RequestObject> postList = requestObjects.subList(requestObjects.size() / 2, requestObjects.size());

        List<APICallRecursiveAction> list = new ArrayList<>();
        list.add(new APICallRecursiveAction(preList));
        list.add(new APICallRecursiveAction(postList));

        return list;

    }

    public static void main(String[] args) {
        List<RequestObject> lists = new ArrayList<>();
        lists.add(new RequestObject(1L, 1L, "First1"));
        lists.add(new RequestObject(2L, 1L, "First2"));
        lists.add(new RequestObject(10L, 5L, "First10"));
        lists.add(new RequestObject(6L, 3L, "First6"));
        lists.add(new RequestObject(3L, 2L, "First3"));
        lists.add(new RequestObject(4L, 2L, "First4"));
        lists.add(new RequestObject(11L, 6L, "First11"));
        lists.add(new RequestObject(8L, 4L, "First8"));
        lists.add(new RequestObject(5L, 3L, "First5"));
        lists.add(new RequestObject(7L, 4L, "First7"));
        lists.add(new RequestObject(9L, 5L, "First9"));
        lists.add(new RequestObject(12L, 6L, "First12"));

        APICallRecursiveAction action = new APICallRecursiveAction(lists);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        forkJoinPool.invoke(action);
    }
}

- RecursiveTask : 

> 기대하는 바와 같이 이 작업은 결과를 반환하는 태스크를 Fork와 Join으로 처리한다. 

예제 : 
package com.company;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CustomRecursiveTask extends RecursiveTask<Integer> {

    private int[] arr;

    private static final int THRESHOLD = 4;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
//                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10)
                .sum();
    }

    public static void main(String[] args) {
        ForkJoinPool commonPool = ForkJoinPool.commonPool();

        int[] arr = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        CustomRecursiveTask task = new CustomRecursiveTask(arr);

//        commonPool.execute(task);
//        Integer result = task.join();
//        System.out.println(result);

//        System.out.println(commonPool.invoke(task));

        System.out.println(task.compute());
    }
}

결론적으로

Fork Join Pool은 우리가 원하는 작업을 Divided and Conquer 방식으로 처리할 수 있도록 해준다. 

실제 프로그램에서 사용할때에는 외부 API 를 여러번 호출해야하는 상황에서 작업을 분리하여 호출할 수도 있을 것이다. 

Share this

Related Posts

Previous
Next Post »