CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage是对Future的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
Future没有提供通知机制,Future是否执行完任务需要通过轮询isDone这个方法查询执行结果或者调用get()方法阻塞任务执行,CompletionStage解决了该问题,前一个任务执行成功后可以自动触发下一个任务的执行,中间无需等待。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static CompletableFuture supplyAsync(Supplier supplier)
//自定义线程,根据supplier构建执行任务
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture runFuture =CompletableFuture.runAsync(() -> userService.compute("runAsync"), fishExecutor);CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("supplyAsync"), fishExecutor);System.out.println(runFuture.join());System.out.println(supplyFuture.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}}
}
join与get的区别
相同点
1.join()和get()方法都是用来获取CompletableFuture异步之后的返回值
不同点
1.join()方法抛出的是RuntimeException异常,不需要特殊处理
2.get()方法抛出的是需要手动处理的异常,例如ExecutionException, InterruptedException,需要抛出或者try-catch
结果打印
16:45:22.054 [fish] INFO com.example.mavendemo.Test$UserService - runAsync
16:45:22.054 [fish] INFO com.example.mavendemo.Test$UserService - supplyAsync
null
supplyAsync
public CompletableFuture thenRun(Runnable action);
public CompletableFuture thenRunAsync(Runnable action);
thenRun/thenRunAsync
作用是做完第一个任务后,再做第二个任务(Runnable action)。前一个任务执行结束后,执行回调方法(Runnable action);但是两个任务之间没有参数传递,第二个任务也没有返回值thenRun执行第二个任务的线程池和第一个任务是同一个,或者thenRunAsync传入的线程池
thenRunAsync执行第二个任务使用的是ForkJoin线程池,或者你传入的线程池
public CompletableFuture thenRun(Runnable action) {return uniRunStage(null, action);}public CompletableFuture thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);}public CompletableFuture thenRunAsync(Runnable action, Executor executor) {return uniRunStage(screenExecutor(executor), action);}
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test2 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 = supplyFuture.thenRun(() -> userService.noResult("第二个任务"));CompletableFuture voidCompletableFuture2 = supplyFuture.thenRunAsync(() -> userService.noResult("第三个任务"));System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
16:45:48.526 [fish] INFO com.example.mavendemo.Test2$UserService - 第一个任务
16:45:48.529 [fish] INFO com.example.mavendemo.Test2$UserService - 第二个任务
null
16:45:48.530 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test2$UserService - 第三个任务
null
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test2 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static ExecutorService catExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "cat"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 = supplyFuture.thenRun(() -> userService.noResult("第二个任务"));CompletableFuture voidCompletableFuture2 = supplyFuture.thenRunAsync(() -> userService.noResult("第三个任务"));CompletableFuture voidCompletableFuture3 =supplyFuture.thenRunAsync(() -> userService.noResult("第四个任务"), catExecutor);System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());System.out.println(voidCompletableFuture3.join());fishExecutor.shutdown();catExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
16:47:34.132 [fish] INFO com.example.mavendemo.Test2$UserService - 第一个任务
16:47:34.134 [cat] INFO com.example.mavendemo.Test2$UserService - 第四个任务
16:47:34.134 [cat] INFO com.example.mavendemo.Test2$UserService - 第二个任务
null
16:47:34.135 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test2$UserService - 第三个任务
null
null
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test3 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 =supplyFuture.thenAccept((a) -> userService.noResult("传入了前一个任务的结果:" + a));CompletableFuture voidCompletableFuture2 =supplyFuture.thenAcceptAsync((a) -> userService.noResult("传入了前一个任务的结果:" + a));System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
17:01:09.637 [fish] INFO com.example.mavendemo.Test3$UserService - 这是第一个任务
17:01:09.639 [fish] INFO com.example.mavendemo.Test3$UserService - 传入了前一个任务的结果:这是第一个任务
17:01:09.639 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test3$UserService - 传入了前一个任务的结果:这是第一个任务
null
null
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test4 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 =supplyFuture.thenApply((a) -> userService.compute("传入了前一个任务的结果:" + a));CompletableFuture voidCompletableFuture2 =supplyFuture.thenApplyAsync((a) -> userService.compute("传入了前一个任务的结果:" + a));System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
17:06:08.946 [fish] INFO com.example.mavendemo.Test4$UserService - 这是第一个任务
17:06:08.948 [fish] INFO com.example.mavendemo.Test4$UserService - 传入了前一个任务的结果:这是第一个任务
传入了前一个任务的结果:这是第一个任务
17:06:08.948 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test4$UserService - 传入了前一个任务的结果:这是第一个任务
传入了前一个任务的结果:这是第一个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test5 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.exceptionMethod("这是第一个任务"), fishExecutor);CompletableFuture exceptionally = supplyFuture.exceptionally(o -> userService.catchException(o));System.out.println(exceptionally.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String exceptionMethod(String message) {log.info(message);throw new RuntimeException(message);}public String catchException(Object o) {RuntimeException e = (RuntimeException)o;log.error("{}", e);return e.getMessage();}}
}
17:37:53.901 [fish] INFO com.example.mavendemo.Test5$UserService - 这是第一个任务
17:37:53.907 [fish] ERROR com.example.mavendemo.Test5$UserService - {}
java.util.concurrent.CompletionException: java.lang.RuntimeException: 这是第一个任务at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 这是第一个任务at com.example.mavendemo.Test5$UserService.exceptionMethod(Test5.java:27)at com.example.mavendemo.Test5.lambda$main$1(Test5.java:16)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 3 common frames omitted
java.lang.RuntimeException: 这是第一个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test6 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 =supplyFuture.whenComplete((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));CompletableFuture voidCompletableFuture2 =supplyFuture.whenCompleteAsync((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
17:49:36.135 [fish] INFO com.example.mavendemo.Test6$UserService - 这是第一个任务
17:49:36.137 [fish] INFO com.example.mavendemo.Test6$UserService - 传入了前一个任务的结果:这是第一个任务
这是第一个任务
17:49:36.137 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test6$UserService - 传入了前一个任务的结果:这是第一个任务
这是第一个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test7 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture supplyFuture =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture voidCompletableFuture1 =supplyFuture.handle((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));CompletableFuture voidCompletableFuture2 =supplyFuture.handleAsync((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));System.out.println(voidCompletableFuture1.join());System.out.println(voidCompletableFuture2.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public void noResult(String message) {log.info(message);}}
}
17:52:20.197 [fish] INFO com.example.mavendemo.Test7$UserService - 这是第一个任务
17:52:20.199 [fish] INFO com.example.mavendemo.Test7$UserService - 传入了前一个任务的结果:这是第一个任务
传入了前一个任务的结果:这是第一个任务
17:52:20.199 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test7$UserService - 传入了前一个任务的结果:这是第一个任务
传入了前一个任务的结果:这是第一个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test8 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture first =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture second =CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);CompletableFuture thenCombine = first.thenCombine(second, (s, s2) -> userService.combine(s, s2));System.out.println(thenCombine.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public String combine(String first, String second) {log.info("这是combine方法");return first + second;}}
}
09:49:38.621 [fish] INFO com.example.mavendemo.Test8$UserService - 这是第二个任务
09:49:38.621 [fish] INFO com.example.mavendemo.Test8$UserService - 这是第一个任务
09:49:38.624 [fish] INFO com.example.mavendemo.Test8$UserService - 这是combine方法
这是第一个任务这是第二个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test9 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture first =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture second =CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);CompletableFuture applyToEither = first.applyToEither(second, s -> userService.either(s));System.out.println(applyToEither.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public String either(String result) {log.info("这是either方法");return result;}}
}
10:02:51.523 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第一个任务
10:02:51.523 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第二个任务
10:02:51.526 [fish] INFO com.example.mavendemo.Test9$UserService - 这是either方法
这是第一个任务
10:03:50.381 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第二个任务
10:03:50.381 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第一个任务
10:03:50.383 [fish] INFO com.example.mavendemo.Test9$UserService - 这是either方法
这是第二个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test10 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) {CompletableFuture first =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture second =CompletableFuture.supplyAsync(() -> userService.error("这是第二个任务"), fishExecutor);CompletableFuture allOfFuture =CompletableFuture.allOf(first, second).whenComplete((unused, throwable) -> {System.out.println(unused);System.out.println(throwable.getMessage());System.out.println("finish");});System.out.println(allOfFuture.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public String compute(String message) {log.info(message);return message;}public String error(String message) {log.info(message);throw new RuntimeException("出错了");}}
}
10:41:43.120 [fish] INFO com.example.mavendemo.Test10$UserService - 这是第二个任务
10:41:43.120 [fish] INFO com.example.mavendemo.Test10$UserService - 这是第一个任务
null
java.lang.RuntimeException: 出错了
finish
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: 出错了at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 出错了at com.example.mavendemo.Test10$UserService.computeError(Test10.java:40)at com.example.mavendemo.Test10.lambda$main$2(Test10.java:18)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 3 more
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test11 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture first =CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);CompletableFuture second =CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);CompletableFuture
13:15:46.529 [fish] INFO com.example.mavendemo.Test11$UserService - 这是第二个任务
13:15:46.529 [fish] INFO com.example.mavendemo.Test11$UserService - 这是第一个任务
这是第二个任务
finish
这是第二个任务
package com.example.mavendemo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import lombok.extern.slf4j.Slf4j;public class Test12 {static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));static UserService userService = new UserService();public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture first =CompletableFuture.supplyAsync(() -> userService.number("996"), fishExecutor);CompletableFuture thenCompose = first.thenCompose(s -> CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务,接收到的入参是:" + s), fishExecutor));System.out.println(thenCompose.join());fishExecutor.shutdown();}@Slf4jstatic class UserService {public Integer number(String number) {log.info(number);return Integer.valueOf(number);}public String compute(String message) {log.info(message);return message;}}
}
13:57:16.096 [fish] INFO com.example.mavendemo.Test12$UserService - 996
13:57:16.098 [fish] INFO com.example.mavendemo.Test12$UserService - 这是第二个任务,接收到的入参是:996
这是第二个任务,接收到的入参是:996
如果任务执行发生异常,需要调用get()或join()获取返回值,才能获取异常信息;或者使用try…catch…或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,最好添加超时时间,例如get(10, TimeUnit.SECONDS)
推荐使用自定义线程池,这样可以根据需要优化线程池配置