搞懂多线程(三)之Future接口
一、概念
Future:是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
二、Future常见方法
接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
三、FutureTask实现类
1、背景
FutureTask:通过实现RunnableFuture
子接口的形式,间接实现了Future
接口,而且可以通过构造注入的方式,传入Runnable
和Callable
对象,所以既能满足创建多线程的所有要求,也能满足Future
的计算功能。
2、使用
public class FutureTaskDemo {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask, "AAA").start();
String futureTaskResult = futureTask.get();
//String futureTaskResult = futureTask.get(1,TimeUnit.SECONDS); //手动设置等待时间,超过等待时间则抛出异常。
System.out.println("futureTask.get..." + futureTaskResult);
System.out.println("main....");
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("MyThread....call....");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello";
}
}
注意点:
FutureTask.get()
为阻塞的,阻塞过程中会影响主线程操作。通常情况下get()
会放在整个程序最后。FutureTask.get(long timeout, TimeUnit unit)
可手动设置等待时间,超过等待时间则抛出异常。可以使用
isDone()
方法来检测方法是否处理完成。但会引起CPU轮询
while (true){
if (futureTask.isDone()){
System.out.println("futureTask.get..." + futureTaskResult);
break;
}else {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("正在处理中...");
}
}
可以使用线程池来加快程序运行
public static void main(String[] args) {
//创建一个初始和最大线程个数都为为3的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
FutureTask<String> futureTask1 = new FutureTask<>(() -> {
System.out.println("futureTask1....处理中");
return "over";
});
FutureTask<String> futureTask2 = new FutureTask<>(() -> {
System.out.println("futureTask2....处理中");
return "over";
});
//提交线程任务
threadPool.submit(futureTask1);
threadPool.submit(futureTask2);
//需要关闭线程池
threadPool.shutdown();
}
3、优缺点
优点:可以利用多线程加快程序运行
缺点:获取运行结果不方便,无法完成复杂的线程问题,例如线程结果依赖。
三、CompletableFuture实现类
1、背景
CompletableFuture:实现了Future
和CompletionStage
两个接口,所以拥有了这两个接口的功能,既能完成Future
异步线程功能,又能对线程间的依赖进行处理。
2、使用
a、四个重要的静态方法
CompletableFuture<Void> runAsync(Runnable runnable)
:使用默认线程池创建,无返回值CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
:使用自定义线程池创建,无返回值CompletableFuture<U> supplyAsync(Supplier<U> supplier)
:使用默认线程池创建,有返回值CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
:使用自定义线程池创建,有返回值public static void main(String[] args) throws ExecutionException, InterruptedException { //使用默认线程池ForkJoinPool.commonPool() CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); }); //无返回值,get()为null System.out.println(runAsync.get()); ExecutorService threadPool = Executors.newFixedThreadPool(3); //使用线程池,并且有返回值 CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "hello-world"; },threadPool); System.out.println(supplyAsync.get()); threadPool.shutdown(); }
b、其他方法
获取结构和触发计算
get():用于获取结果值,需要抛出
InterruptedException
和ExecutionException
异常,会阻塞get(long timeout, TimeUnit unit):用于获取结果值,超过等待时间抛
TimeoutException
,需要抛出InterruptedException
和ExecutionException
异常,会阻塞join():用于获取结果值,无需抛出异常,会阻塞
getNow(T valueIfAbsent):立马返回结果值,如果计算完成则返回计算结果,否则返回
valueIfAbsent
值complete(T value):是否计算完成,返回一个bool值,如果计算完成,调用get()/join()时返回计算结果,否则返回
value
值
对结算结果进行处理
thenApply(Function<? super T,? extends U> fn):可拿到上一步的结果进行后续处理,如果出现异常,直接抛出异常
handle(BiFunction<? super T, Throwable, ? extends U> fn):可拿到上一步的结果进行后续处理,如果出现异常,可进行手动处理异常,继续向下执行
thenAccept(Consumer<? super T> action):可拿到上一步的结果进行后续处理,无返回值
thenRun(Runnable action):开启一个新的线程,和之前线程没有关系
链式调用线程池thenRun()和thenRunSync()特殊说明
没有传入自定义线程池,都用默认线程池
ForkJoinPool
传入了一个自定义线程池,
如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
有可能处理太快,系统优化切换原则,直接使用main线程处理
其它如:
thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它们之间的区别也是同理
对计算结果进行选用
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):选出较快的一个对象,返回
CompletableFuture
对象
public static void main(String[] args) {
CompletableFuture<String> supplyAsyncAA = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "AA";
});
CompletableFuture<String> supplyAsyncBB = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "BB";
});
CompletableFuture<String> applyToEither = supplyAsyncAA.applyToEither(supplyAsyncBB, result -> {
return "时间短的是" + result;
});
System.out.println(applyToEither.join());
}
对结果合并
thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn):对结果进行合并
CompletableFuture<String> thenCombine = supplyAsyncAA.thenCombine(supplyAsyncBB, (aResult, bResult) -> {
return aResult + bResult;
});
ystem.out.println(thenCombine.join());