JUC系列(七) ForkJion任务拆分与异步回调
创始人
2024-03-13 19:40:28
0

📣 📣 📣 📢📢📢
☀️☀️你好啊!小伙伴,我是小冷。是一个兴趣驱动自学练习两年半的的Java工程师。
📒 一位十分喜欢将知识分享出来的Java博主⭐️⭐️⭐️,擅长使用Java技术开发web项目和工具
📒 文章内容丰富:覆盖大部分java必学技术栈,前端,计算机基础,容器等方面的文章
📒 如果你也对Java感兴趣,关注小冷吧,一起探索Java技术的生态与进步,一起讨论Java技术的使用与学习
✏️高质量技术专栏专栏链接: 微服务数据结构netty,单点登录,SSMSpringCloudAlibaba
😝公众号😝想全栈的小冷,分享一些技术上的文章,以及解决问题的经验
当前专栏JUC系列

ForkJion

什么是ForkJoin

ForkJoin 下 JDK 1.7 并行执行任务的,数量越大,效率越高

比如 :大数据 Map Reduce(把大任务拆分成小任务)

image-20220304004113183

ForkJoin 特点: 工作窃取

举例子:

PS: 维护的是双端队列 Deuue

A线程执行任务到 第二个

B线程执行完毕,那么B线程回去讲A线程的东西拿来执行,从而提高效率

image-20220304004235249

认识forkjion

ForkJoin 使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

image-20220304005022113

代码实例

task 类 里面编写的是我们继承了 递归任务继承的实现方法

public class forkjoinDemo extends RecursiveTask {/* 解决方案 也是有三六九等的,比如案例 求和* 最低等 就是直接for循环求和* 中等 使用forkjion* 高等 stream 并行流* */
//开始private long start;//结束private long end;//到多少值,才开始分开任务private long threshold = 10000L;public forkjoinDemo(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {//判断超过阈值的时候 开始使用 fork joinif (end - start > threshold) {long sum = 0L;for (long i = start; i <= end; i++) {sum += i;}return sum;} else {//    求出中间值long mid = (start - end) / 2;forkjoinDemo task1 = new forkjoinDemo(start, mid);//拆分任务,把任务压入线程队列task1.fork();forkjoinDemo task2 = new forkjoinDemo(mid + 1, end);task2.fork();return task1.join() + task2.join();}}
}

测试类

三种方法的速度

public class test {public static void main(String[] args) throws ExecutionException, InterruptedException {//test1(); 7042;//test2(); 969//test3(); 179;}public static void test1() {Long sum = 0L;long start = System.currentTimeMillis();for (long i = 1L; i <= 10_0000_0000; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("sum" + sum + "=> 执行时间" + (end - start));}public static void test2() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask task = new forkjoinDemo(0L, 10_0000_0000L);ForkJoinTask submit = forkJoinPool.submit(task);Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("sum" + sum + "=> 执行时间" + (end - start));}public static void test3() {long start = System.currentTimeMillis();//并行流long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);long end = System.currentTimeMillis();System.out.println("sum" + reduce + "=> 执行时间" + (end - start));}
}

异步回调

什么是future

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

image-20220304014056949

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

Future接口的局限性

当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程。即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。

image-20220304014316398

我们需要新的,更强大的拓展,CompletableFuture

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

实例化:

有两种格式,一种是supply开头的方法,一种是run开头的方法

  • supply开头:这种方法,可以返回异步线程执行之后的结果
  • run开头:这种不会返回结果,就只是执行线程任务
public static  CompletableFuture supplyAsync(Supplier supplier);
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor);public static CompletableFuture runAsync(Runnable runnable);
public static CompletableFuture runAsync(Runnable runnable, Executor executor);

获取结果

同步获取结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

简单的例子

CompletableFuture future = new CompletableFuture<>();
Integer integer = future.get();

get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。有兴趣的小伙伴可以打个断点看看,状态会一直是not completed

代码使用案例

 public static void main(String[] args) throws ExecutionException, InterruptedException {没有返回值的异步回调, runAsync//CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {//    System.out.println(Thread.currentThread().getName() + "runAsync=> Void");//});//System.out.println("1111");获取执行结果//completableFuture.get();//    有返回值的CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "runAsync=>integer");int i = 10 / 0;return 1024;});completableFuture.whenComplete((t, u) -> {//t是正常的返回结果//u是返回报错信息System.out.println("t=>" + t);System.out.println("u=>" + u);}).exceptionally((e) -> {System.out.println(e.getMessage());return 233;}).get();}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...