已发布7年(Java8已经发布7年了,不会还有人没用过CompletableFuture吧)

前言

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

Future有哪些缺陷?

多线程的场景下,我们要监听每个线程异步执行的结果,如果用Future去实现,代码如下:

/** * Future异步示例 * * @author liudong * @date 2021/12/9 15:37 */ public class FutureDemo { public static void main(String args) throws Exception { // 1. 创建线程池 ExecutorService executorService = Executors.newCachedThreadPool(); // 2. 提交任务 ListCallableString tasks = new ArrayList(); ArrayListInteger taskIds = Lists.newArrayList(1, 2, 3); taskIds.forEach(taskId - { CallableString task = () - { Thread.sleep(1000); return "任务" taskId "执行完成!"; }; tasks.add(task); }); ListFutureString futures = executorService.invokeAll(tasks); // 3. 获取任务执行结果 for (FutureString future : futures) { String result = future.get(); System.out.println(result); } executorService.shutdown(); } }

执行结果

任务1执行完毕! 任务2执行完毕! 任务3执行完毕!

大家有没有思考过这样使用有没有什么问题?笔者认为有如下几个缺陷:

  • (1) 不能回调会阻塞
  • (2) 批量任务处理彼此依赖会阻塞
  • (3) 不能多个任务级联执行,将结果依次往下传递
  • (4) 得不到最先完成的任务

CompletableFuture能解决吗?

针对如上几个问题,看看CompletableFuture是怎么解决的。

使用CompletableFuture提供的supplyAsync和whenCompleteAsync两个方法优化以上代码,如下:

/** * CompletableFuture异步示例 * * @author liudong * @date 2021/12/9 15:37 */ public class CompletableFutureDemo { public static void main(String args) throws Exception { // 1. 创建线程池 ExecutorService executorService = Executors.newCachedThreadPool(); // 2. 提交任务 ListCallableString tasks = new ArrayList(); ArrayListInteger taskIds = Lists.newArrayList(1, 2, 3); // 3. 回调任务执行结果 ListCompletableFutureString completableFutures = new ArrayList(); CompletableFuture cfs = taskIds.stream().map((taskId) - { CompletableFutureString completableFuture = CompletableFuture.supplyAsync(() - { try { Thread.sleep(1000L); } catch (InterruptedException e) { } return "任务" taskId "执行完成!"; }, executorService); // 异步返回执行结果 completableFuture.whenCompleteAsync((result, exception) - { System.out.println(result); }); // 将处理结果传递到子任务 completableFuture.thenAccept((result) - { System.out.println("上级任务处理结果:" result); }); return completableFuture; }).toArray(CompletableFuture::new); // 获取最先执行完的任务 CompletableFutureObject firstEnd = CompletableFuture.anyOf(cfs); System.out.println("最先执行完的任务:" firstEnd.get()); executorService.shutdown(); } }

执行结果:

上级任务处理结果:任务1执行完成! 上级任务处理结果:任务3执行完成! 最先执行完的任务:任务1执行完成! 上级任务处理结果:任务2执行完成! 任务3执行完成! 任务2执行完成! 任务1执行完成!

以上可以看出,执行结果是异步打印,不会阻塞,也不会顺序依赖,能获取上级任务执行结果,并能够获取到最先执行完的任务。

扩展知识点:

  • (1) 创建异步操作:
    • runAsync:不支持返回值
    • supplyAsync:支持返回值
  • (2) 计算结果完成时的回调方法:
    • whenComplete:执行完当前任务的线程,继续执行 whenComplete 的任务。
    • whenCompleteAsync:执行完当前任务的线程,把whenCompleteAsync 的任务继续提交给线程池来执行。
    • exceptionally:当前任务出现异常时,执行exceptionally中的回调方法。
  • (3) 线程串行化:
    • thenApply:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
    • thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
    • thenRun:跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
    • handle:执行任务完成时,handle可以对结果进行处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
  • (4) 合并任务
    • thenCombine:用于合并任务,thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
    • thenCompose:thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。