(十五)ForkJoin框架
创始人
2024-05-16 10:47:38
0

ForkJoinPool

ForkJoinPool是一种“分治算法”的多线程并行计算框架,自Java7引入。它将一个大的任务分为若干个子任务,这些子任务分别计算,然后合并出最终结果。
ForkJoinPool比普通的线程池可以更好地实现计算的负载均衡,提高资源利用率。

创建ForkJoinPool

构造方法

共有三个public的构造方法,最多的有4个参数,分别是并行度、工作线程工厂,线程未捕获异常的处理器、工作队列模式(FIFO或LIFO,默认是LIFO)、工作线程名称前缀。一般在使用无参或一个参数的构造方法即可,(或者使用commonPool),如果需要定制线程继承ForkJoinWorkerThread,则使用4个参数的构造方法。
//ForkJoinPool.commonPool();
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode, String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

RecursiveAction和RecursiveTask

这两个类都是ForkJoinTask子类,用于实现子任务的逻辑。区别是前者没有返回值,后者有返回值。使用时,针对不同类型的任务,可以分别继承这两个类,实现其compute方法。

使用

案例1(RecursiveAction):快速排序

基本思想:
1、利用数组的某个元素(一般取第一个元素)把数组划分成两半,左边子数组里面的元素小于等于该元素,右边子数组里面的元素大于等于该元素。
2、对左右的2个子数组分别排序。
将数组划分为两部分后,对子数组分别排序是独立的子问题,这个过程可以递归分解子问题,所以可以利用多个线程分别为子数组排序。
package com.example.demo;import org.junit.Test;import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;public class ForkJoinTest {ForkJoinPool pool = ForkJoinPool.commonPool();SecureRandom random = new SecureRandom();@Testpublic void testSort() throws ExecutionException, InterruptedException {StringJoiner before = new StringJoiner(",","[","]");//20个数的数组int[] arr = getRandomIntArray(20);Arrays.stream(arr).mapToObj(String::valueOf).forEach(before::add);System.out.println(before.toString());Instant start = Instant.now();ForkJoinTask task = pool.submit(new QuickSortTask(arr));//阻塞直到完成排序task.get();long i = Duration.between(start,Instant.now()).get(ChronoUnit.NANOS);System.out.println("排序时间:" + i + "纳秒");StringJoiner after = new StringJoiner(",","[","]");Arrays.stream(arr).mapToObj(String::valueOf).forEach(after::add);System.out.println(after.toString());}private int[] getRandomIntArray(int count) {int bound = count * 10;int[] array = new int[count];for (int i = 0; i < count; i++){array[i] = random.nextInt(bound);}return array;}public static class QuickSortTask extends RecursiveAction {private int start;private int end;private int[] array;public QuickSortTask(int[] array){this.array = array;this.start = 0;this.end = array.length-1;}public QuickSortTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected void compute() {int mid = part(array, start, end);//当左边还有元素时if (mid != start) {QuickSortTask task1 = new QuickSortTask(array, start, mid - 1);task1.fork();task1.join();}//当右边还有元素时if (mid != end) {QuickSortTask task2 = new QuickSortTask(array, mid + 1, end);task2.fork();task2.join();}}/*** 

返回基准值的下标,基准值左的元素都小于等于基准值,基准值右的元素大于等于基准值

* @param array* @param start* @param end* @return*/private int part(int[] array, int start, int end) {int i = start, j = end;//基准值的下标int base = start;//左右扫描相遇时结束while (i < j) {//从右向左扫描,如果当前值比基准值小,则置换,已经置换过的元素不再扫描(j的右边)while (i < j && array[j] >= array[base]) {j--;}if (i < j) {swap(array, j, base);base = j;}//从左向右扫描,如果当前值比基准值大,则置换,已经置换过的元素不再扫描(i的左边)while (i < j && array[i] <= array[base]) {i++;}if (i < j) {swap(array,i, base);base = i;}}return base;}private void swap(int[] array, int x, int y) {if (x != y) {int temp = array[x];array[x] = array[y];array[y] = temp;}}} }

案例2(RecursiveTask):求和

例如从1加到100,如果不用高斯的方法,可以用程序实现累加,将数拆分成小组,每个小组互相独立,每个小组组内分别相加,最后把组的结果相加,这个过程可以使用ForkJoin。
RecursiveTask
package com.example.demo;import org.junit.Test;import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;public class ForkJoinTest {ForkJoinPool pool = ForkJoinPool.commonPool();@Testpublic void testSum() throws ExecutionException, InterruptedException {ForkJoinTask submit = pool.submit(new SumTask(1, 100));System.out.println(submit.get());}public static class SumTask extends RecursiveTask{private int startNum;private int endNum;//决定当子任务处理的元素个数小于此值时不再切分任务,直接进行计算private static final int THRESHOLD = 10;public SumTask(int startNum, int endNum){this.startNum = startNum;this.endNum = endNum;}@Overrideprotected Integer compute() {int sum = 0;if (endNum - startNum + 1 < THRESHOLD){for (int i = startNum; i <= endNum; i++){sum += i;}return sum;}int mid = split(startNum, endNum);SumTask task1 = new SumTask(startNum, mid);SumTask task2 = new SumTask(mid + 1, endNum);ForkJoinTask fork1 = task1.fork();ForkJoinTask fork2 = task2.fork();sum = fork1.join() + fork2.join();return sum;}private int split(int startNum, int endNum) {return (startNum + endNum)/2;}}
}

关闭

和ThreadPoolExecutor一样,ForkJoinPool使用完也要关闭,依然是使用shutdown和shutdownNow方法,shutdown只拒绝新提交的任务;shutdownNow会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。
public void shutdown();
public List shutdownNow();
ForkJoinPool pl=new ForkJoinPool();
try {boolean flag;do {flag = pl.awaitTermination(500,TimeUnit.MILLISECONDS);} while (!flag);
} catch (Exception e){e.printStackTrace();
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...