学科分类
目录
Java基础

CompletableFuture类实现线程池管理

在使用Callable接口实现多线程时,会用到FutureTask类对线程执行结果进行管理和获取,由于该类在获取结果时是通过阻塞或者轮询的方式,违背多线程编程的初衷且耗费过多资源。为此,JDK 8中对FutureTask存在的不足进行了改进,新增了一个强大的函数式异步编程辅助类CompletableFuture,该类同时实现了Future接口和CompletionStage接口(Java 8中新增的一个线程任务完成结果接口),并对Future进行了强大的扩展,简化异步编程的复杂性。

在使用CompletableFuture类在进行线程管理时,通常会使用四个静态方法来为一段异步执行的代码创建CompletableFuture对象,具体如表1所示。

表1 CompletableFuture对象创建的四个静态方法

方法声明 功能描述
static CompletableFuture<Void> runAsync(Runnable runnable) 以Runnable函数式接口类型为参数,并使用ForkJoinPool.commonPool()作为它的线程池执行异步代码获取CompletableFuture计算结果为空的对象
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 以Runnable函数式接口类型为参数,并传入指定的线程池执行器executor来获取CompletableFuture计算结果为空的对象
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 以Supplier函数式接口类型为参数,并使用ForkJoinPool.commonPool()作为它的线程池执行异步代码获取CompletableFuture计算结果非空的对象
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 以Supplier函数式接口类型为参数,并传入指定的线程池执行器executor来获取CompletableFuture计算结果非空的对象

表1中,获取CompletableFuture对象的静态方法中,runAsync()和supplyAsync()方法的本质区别就是获取的CompletableFuture对象是否带有计算结果(类似于Runnable接口和Callable接口的区别)。另外,带有Executor参数的方法用于传入指定的线程池执行器来进行多线程管理,而未带有Executor参数的方法会使用默认的ForkJoinPool.commonPool()作为它的线程池进行多线程管理。

了解了创建CompletableFuture对象的常用方法后,接下来通过一个案例来演示如何通过CompletableFuture来实现线程池管理,如文件1所示。

文件1 Example19.java

 1    import java.util.concurrent.*;
 2    public class Example19 {
 3        public static void main(String[] args) throws InterruptedException,
 4                                                      ExecutionException {
 5            // 1、创建第一个线程,执行1到5相加运算
 6            CompletableFuture<Integer> completableFuture1 = 
 7                                    CompletableFuture.supplyAsync(() -> {
 8                int sum = 0, i = 0;
 9                while (i++ < 5) {
 10                    sum += i;
 11                    // 显示线程任务执行过程
 12                    System.out.println(Thread.currentThread().getName() 
 13                                         + "线程任务正在执行...i:" + i);
 14                }
 15                return sum;
 16            });
 17            // 创建第二个线程,执行6到10相加运算
 18            CompletableFuture<Integer> completableFuture2 = 
 19                                    CompletableFuture.supplyAsync(() -> {
 20                int sum = 0, j = 5;
 21                while (j++ < 10) {
 22                    sum += j;
 23                    System.out.println(Thread.currentThread().getName() 
 24                                         + "线程任务正在执行...j:" + j);
 25                }
 26                return sum;
 27            });
 28            // 2、将两个线程执行结果进行获取整合
 29            CompletableFuture<Integer> completableFuture3 = 
 30                        completableFuture1.thenCombine(completableFuture2,
 31                    (result1, result2) -> result1 + result2);
 32             System.out.println("1到10相加的结果为:"
 33                                 + completableFuture3.get());
 34        }
 35    }

运行结果如图1所示。

图1 运行结果

从图1可以看出,在执行1到10相加运算时将启动的两个线程worker-1和worker-2默认提交到了ForkJoinPool.commonPool()线程池进行管理,同时还可以整合获取这两个线程任务的执行结果。

小提示:

在运行文件1所示的代码时,有可能会出现只有线程池中的一个线程worker-1执行了全部1到5和6到10的相加运算,这是因为案例程序是通过线程池管理的,在执行1到5相加和6到10相加的两个子任务过程中,线程池中的空闲线程都会争夺任务执行权,所以就会出现某一个线程worker-1执行完一个任务后又抢到了任务执行权,那么1到10相加的任务可能就会处于某一个线程的执行管理中。

点击此处
隐藏目录