搞懂多线程(三)之Future接口

一、概念

Future:是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

二、Future常见方法

接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。


三、FutureTask实现类

1、背景

FutureTask:通过实现RunnableFuture子接口的形式,间接实现了Future接口,而且可以通过构造注入的方式,传入RunnableCallable对象,所以既能满足创建多线程的所有要求,也能满足Future的计算功能。

2、使用

public class FutureTaskDemo {

    public static void main(String[] args) throws Exception {

        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        new Thread(futureTask, "AAA").start();
        
        String futureTaskResult = futureTask.get();
        //String futureTaskResult = futureTask.get(1,TimeUnit.SECONDS); //手动设置等待时间,超过等待时间则抛出异常。
        System.out.println("futureTask.get..." + futureTaskResult);

        System.out.println("main....");
    }
}

class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("MyThread....call....");

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return "hello";
    }
}

注意点:

  • FutureTask.get()为阻塞的,阻塞过程中会影响主线程操作。通常情况下get()会放在整个程序最后。

  • FutureTask.get(long timeout, TimeUnit unit)可手动设置等待时间,超过等待时间则抛出异常。

  • 可以使用isDone()方法来检测方法是否处理完成。但会引起CPU轮询

while (true){
     if (futureTask.isDone()){
          System.out.println("futureTask.get..." + futureTaskResult);
          break;
      }else {
          TimeUnit.MILLISECONDS.sleep(100);
          System.out.println("正在处理中...");
      }
}
  • 可以使用线程池来加快程序运行

public static void main(String[] args) {
    //创建一个初始和最大线程个数都为为3的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    FutureTask<String> futureTask1 = new FutureTask<>(() -> {
        System.out.println("futureTask1....处理中");
        return "over";
    });
    FutureTask<String> futureTask2 = new FutureTask<>(() -> {
        System.out.println("futureTask2....处理中");
        return "over";
    });

    //提交线程任务
    threadPool.submit(futureTask1);
    threadPool.submit(futureTask2);
    //需要关闭线程池
    threadPool.shutdown();
}

3、优缺点

优点:可以利用多线程加快程序运行

缺点:获取运行结果不方便,无法完成复杂的线程问题,例如线程结果依赖。


三、CompletableFuture实现类

1、背景

CompletableFuture:实现了FutureCompletionStage两个接口,所以拥有了这两个接口的功能,既能完成Future异步线程功能,又能对线程间的依赖进行处理。

2、使用

a、四个重要的静态方法

  • CompletableFuture<Void> runAsync(Runnable runnable)使用默认线程池创建,无返回值

  • CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)使用自定义线程池创建,无返回值

  • CompletableFuture<U> supplyAsync(Supplier<U> supplier)使用默认线程池创建,有返回值

  • CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)使用自定义线程池创建,有返回值

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        //使用默认线程池ForkJoinPool.commonPool()
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
        });
        //无返回值,get()为null
        System.out.println(runAsync.get());
    
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        //使用线程池,并且有返回值
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "hello-world";
        },threadPool);
    
        System.out.println(supplyAsync.get());
    
        threadPool.shutdown();
    
    }

b、其他方法

获取结构和触发计算

  • get():用于获取结果值,需要抛出InterruptedExceptionExecutionException异常,会阻塞

  • get(long timeout, TimeUnit unit):用于获取结果值,超过等待时间抛TimeoutException,需要抛出InterruptedExceptionExecutionException异常,会阻塞

  • join():用于获取结果值,无需抛出异常,会阻塞

  • getNow(T valueIfAbsent):立马返回结果值,如果计算完成则返回计算结果,否则返回valueIfAbsent

  • complete(T value):是否计算完成,返回一个bool值,如果计算完成,调用get()/join()时返回计算结果,否则返回value

对结算结果进行处理

  • thenApply(Function<? super T,? extends U> fn):可拿到上一步的结果进行后续处理,如果出现异常,直接抛出异常

  • handle(BiFunction<? super T, Throwable, ? extends U> fn):可拿到上一步的结果进行后续处理,如果出现异常,可进行手动处理异常,继续向下执行

  • thenAccept(Consumer<? super T> action):可拿到上一步的结果进行后续处理,无返回值

  • thenRun(Runnable action):开启一个新的线程,和之前线程没有关系

链式调用线程池thenRun()和thenRunSync()特殊说明

  1. 没有传入自定义线程池,都用默认线程池ForkJoinPool

  1. 传入了一个自定义线程池,

    1. 如果你执行第一个任务的时候,传入了一个自定义线程池

    2. 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

  2. 有可能处理太快,系统优化切换原则,直接使用main线程处理

  3. 其它如: thenAcceptthenAcceptAsyncthenApplythenApplyAsync等,它们之间的区别也是同理

对计算结果进行选用

  • applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):选出较快的一个对象,返回CompletableFuture对象

public static void main(String[] args) {
    CompletableFuture<String> supplyAsyncAA = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "AA";
    });

    CompletableFuture<String> supplyAsyncBB = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "BB";
    });

    CompletableFuture<String> applyToEither = supplyAsyncAA.applyToEither(supplyAsyncBB, result -> {
        return "时间短的是" + result;
    });

    System.out.println(applyToEither.join());


}

对结果合并

  • thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn):对结果进行合并

CompletableFuture<String> thenCombine = supplyAsyncAA.thenCombine(supplyAsyncBB, (aResult, bResult) -> {
     return aResult + bResult;
});
ystem.out.println(thenCombine.join());