CompletableFuture原理与实践
1、CompletableFuture是什么
CompletableFuture
是Java 8引入的一个用于处理异步编程的类。它提供了一种方便的方式来执行异步操作并处理异步任务的结果。CompletableFuture
可以用于执行异步计算、处理任务的结果、组合多个异步任务等。
2、CompletableFuture的特性
- 异步执行任务: 你可以使用
CompletableFuture
来执行异步任务,这样你的代码可以继续执行其他操作,而不需要等待任务完成。 - 任务链式处理:
CompletableFuture
支持链式调用,你可以通过一系列的方法调用来处理异步任务的结果,从而实现更加复杂的任务流程。 - 组合多个任务: 你可以使用
thenCombine
、thenCompose
等方法将多个CompletableFuture
的结果进行组合,从而实现多个异步任务之间的协作。 - 异常处理: 你可以使用
exceptionally
、handle
等方法来处理异步任务中的异常情况。 - 等待任务完成:
CompletableFuture
提供了get
方法来等待任务的完成并获取结果,但也可以配合其他方法来进行更加灵活的等待操作。 - 并行处理:
CompletableFuture
可以结合Executor
来实现并行处理任务,从而充分利用多核处理器的性能。 - 异步计算: 除了处理IO密集型任务,
CompletableFuture
还可以用于执行异步计算任务,利用多线程进行计算操作。
3、CompletableFuture和Future
CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。
- Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。
- CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排。
4、从FutureTask开始
package com.xx.thread;
import java.util.concurrent.*;
/**
* @Author: wxz
* @Date: 2023/8/20 21:43
*/
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextInt(100) + "";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
//3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,会导致阻塞)
// System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());
//3秒钟后才出来结果,我只想等待1秒钟,超过1秒钟就不等了
// System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));
}
}
get()
阻塞 一旦调用get()方法,不管是否计算完成都会导致阻塞
package com.xx.thread;
import java.util.concurrent.*;
/**
* @Author: wxz
* @Date: 2023/8/20 21:43
*/
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "" + ThreadLocalRandom.current().nextInt(100);
});
new Thread(futureTask, "t1").start();
System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");
// 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break;
}
}
}
}
轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果
如果想要异步获取结果,通常都会以轮询的方式去获取结果 尽量不要阻塞
- 任务性质: 考虑任务的性质,如果任务需要监控中间状态,轮询可能更适合;如果任务完成后需要获取最终结果,阻塞等待可能更合适。
- 响应性要求: 考虑是否需要保持主线程的响应性。轮询方式可以在不阻塞主线程的情况下持续监控,而阻塞等待会阻塞当前线程。
- 超时和定时: 如果你需要实现超时处理或定时任务,轮询方式更具优势。阻塞等待可能需要额外的超时机制。
5、对Future的改进
1、类CompletableFuture
- 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了Future和CompletionStage接口
2、接口CompletionStage
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如: stage.thenApply(x -> square(x).thenAccept(x -> System.out.print(x)).thenRun(0) ->Svstem.out.printIn0))
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
6、核心的四个静态方法
1、runAsync 无 返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
2、supplyAsync 有 返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
7、无返回值Code
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----task is over");
});
System.out.println(future.get());
}
}
ForkJoinPool.commonPool-worker-1 -----come in
-----task is over
null
8、有返回值Code
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextInt(100);
});
System.out.println(completableFuture.get());
}
}
9、回调方法
从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
int result = ThreadLocalRandom.current().nextInt(10);
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----计算结束耗时1秒钟,result: " + result);
if (result > 6) {
int age = 10 / 0;
}
return result;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("-----result: " + v);
}
}).exceptionally(e -> {
System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());
return -44;
});
System.out.println(completableFuture.get());
}
}
异步任务结束时,会自动回调某个对象的方法;
异步任务出错时,会自动回调某个对象的方法;
10、join和get对比
get会抛出异常,join不需要
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
int result = ThreadLocalRandom.current().nextInt(10);
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----计算结束耗时1秒钟,result: " + result);
if (result > 6) {
int age = 10 / 0;
}
return result;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("-----result: " + v);
}
}).exceptionally(e -> {
System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());
return -44;
});
System.out.println(completableFuture.join());
}
}
11、一个简单的例子-电商网站的比价
切记,功能→性能
经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理, 对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。我们去比同一个商品在各个平台上的价格,要求获得一个清单列表, 1 step by step,查完京东查淘宝,查完淘宝查天猫…
package com.xx.cf;
import lombok.Getter;
import org.springframework.util.StopWatch;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @Author: wxz
* @Date: 2023/8/28 15:07
*/
public class T1 {
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("tmall"),
new NetMall("pdd"),
new NetMall("mi")
);
public static List<String> findPriceSync(List<NetMall> list, String productName) {
return list.stream().map(mall -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName))).collect(Collectors.toList());
}
public static List<String> findPriceASync(List<NetMall> list, String productName) {
return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
}
public static void main(String[] args) {
StopWatch stopWatch1 = new StopWatch();
stopWatch1.start();
List<String> list1 = findPriceSync(list, "thinking in java");
for (String element : list1) {
System.out.println(element);
}
stopWatch1.stop();
System.out.println("----costTime: " + stopWatch1.getTotalTimeMillis() + " 毫秒");
StopWatch stopWatch2 = new StopWatch();
stopWatch2.start();
List<String> list2 = findPriceASync(list, "thinking in java");
for (String element : list2) {
System.out.println(element);
}
stopWatch2.stop();
System.out.println("----costTime: " + stopWatch2.getTotalTimeMillis() + " 毫秒");
}
}
class NetMall {
@Getter
private String netMallName;
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
public double getPriceByName(String productName) {
return calcPrice(productName);
}
private double calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() + productName.charAt(0);
}
}
12、CompletableFuture常用方法
1、获得结果和触发计算
获取结果
// 不见不散 public T get() // 过时不候 public T get(long timeout, TimeUnit unit) // 没有计算完成的情况下,给我一个替代结果 // 立即获取结果不阻塞 计算完,返回计算完成后的结果 没算完,返回设定的valueIfAbsent值 public T getNow(T valueIfAbsent) public T join()
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 903;
});
// 去掉注释上面计算没有完成,返回914
// 开启注释上满计算完成,返回计算结果
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println(completableFuture.getNow(914));
}
}
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join());
}
}
主动触发计算
// 是否打断get方法立即返回括号值 public boolean complete(T value)
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 903;
});
// 注释掉暂停线程,get还没有算完只能返回complete方法设置的 914;暂停2秒钟线程,异步线程能够计算完成返回get
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
System.out.println(completableFuture.complete(914) + "\t" + completableFuture.get());
}
}
2、对计算结果进行处理
thenApply(Function fn)
- 输入参数:上一阶段的任务结果类型为 T。
- 返回值:新阶段的任务结果类型为 U。
- 功能:对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。
计算结果存在依赖关系,这两个线程串行化
由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v, e) -> {
System.out.println("*****v: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
handle
// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1024;
}).handle((f, e) -> {
int age = 10 / 0;
System.out.println("222" + f);
return f + 1;
}).handle((f, e) -> {
System.out.println("333" + f);
return f + 1;
}).whenCompleteAsync((v, e) -> {
System.out.println("*****v: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
当一个异常发生时,handle
方法会执行异常处理的逻辑,然后它会将返回值设置为 null
,表示处理后的结果。这就是为什么在异常发生后,f
的值会变成 null
。
具体来说,在你的代码中,第一个 handle
方法中有一个除以零的操作 int age = 10 / 0;
,这会引发一个异常。因此,异常发生时,handle
方法的异常处理逻辑会被执行,异常捕获后,f
被设置为 null
,然后异常继续传递到下一个 handle
方法。在第二个 handle
方法中,f
的值仍然是 null
,所以你会看到输出 “333null”。
在 whenCompleteAsync
方法中,v
是 handle
方法的返回值(即 f
的值),在异常发生时为 null
,因此你会看到 “v: null”。
whenComplete 和 whenCompleteAsync的区别:
whenComplete
:是执行当前任务的线程执行继续执行 whenComplete 的任务
whenCompleteAsync
: 是执行把whenCompleteAsync 这个任务继续提交给线程池来进行执行。
3、对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenApply(f -> {
return f + 4;
}).thenAccept(r -> System.out.println(r));
}
}
任务之间的顺序执行
thenRun
thenRun(Runnable runnable)
// 任务 A 执行完执行 B,并且 B 不需要 A 的结果
thenAccept
thenAccept(Consumer action)
// 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
thenApply
thenApply(Function fn)
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {
}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {
}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
}
}
4、对计算速度进行选用
applyToEither:谁快用谁
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2, f -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return f + 1;
});
System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}
}
5、对计算结果进行合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理
先完成的先等着,等待其它分支任务
thenCombine
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return 20;
});
CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return x + y;
});
System.out.println(thenCombineResult.get());
}
}
package com.xx.cf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: wxz
* @Date: 2023/8/28 14:56
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
return 20;
}), (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
return 30;
}), (a, b) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
return a + b;
});
System.out.println("-----主线程结束,END");
System.out.println(thenCombineResult.get());
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
13、CompletableFuture结合线程池使用
1、查询多次再聚合
@GetMapping("/queryCustomerInfo")
@Operation(summary = "CompletableFuture+自定义线程池分批查询")
public List<Integer> queryCustomerInfo() {
List<Integer> result = new ArrayList<>();
CompletableFuture<List<Integer>> customerInfoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "1");
List<Integer> customerInfo1 = this.getCustomerInfo1();
return customerInfo1;
}, threadPoolTaskExecutor);
CompletableFuture<List<Integer>> scoreFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "2");
List<Integer> customerInfo2 = this.getCustomerInfo2();
return customerInfo2;
}, threadPoolTaskExecutor);
CompletableFuture<List<Integer>> orderFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "3");
List<Integer> customerInfo3 = this.getCustomerInfo3();
return customerInfo3;
}, threadPoolTaskExecutor);
//等待所有任务完成
CompletableFuture.allOf(customerInfoFuture, scoreFuture, orderFuture).join();
Stream.of(customerInfoFuture, scoreFuture, orderFuture).map(CompletableFuture::join).collect(Collectors.toList()).forEach(result::addAll);
return result;
}
public List<Integer> getCustomerInfo1() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(1, 2, 3);
}
public List<Integer> getCustomerInfo2() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(4, 5);
}
public List<Integer> getCustomerInfo3() {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(6, 7);
}
2、Stream流的方式对数据进行分批次处理
@GetMapping("/doLimitByStream")
@Operation(summary = "CompletableFuture+自定义线程池分批查询 Stream流的方式对数据进行分批次处理")
public void doLimitByStream() {
// 1 数据总量集合
List<Integer> list = queryCustomerInfo();
// 2 限流分批,打算每次处理几条或者几页,开发自己设定,支持第3方分批限流
int pageSize = 5;//可以配置
// 3
int pages = (int) Math.ceil(list.size() * 1.0 / pageSize);
for (int i = 1; i <= pages; i++) {
List<Integer> batchList = list.stream().skip(pageSize * (i - 1)).limit(pageSize).collect(Collectors.toList());
System.out.println("batchList: " + i + " = " + batchList);
}
}
3、Guava的方式对数据进行分批次处理
@GetMapping("/doLimitByGuava")
@Operation(summary = "CompletableFuture+自定义线程池分批查询 Guava的方式对数据进行分批次处理")
public void doLimitByGuava() {
//1 数据总量集合
List<Integer> list = queryCustomerInfo();
int pageSize = 3; // 可以配置
if (list != null) {
List<List<Integer>> partition = Lists.partition(list, pageSize);
for (List<Integer> integerList : partition) {
System.out.println(integerList);
}
}
}
14、线程池循环引用会导致死锁
package com.xx.cf;
import java.util.concurrent.*;
/**
* @Author: wxz
* @Date: 2023/9/6 20:24
*/
public class CfCyclic {
public static void main(String[] args) {
ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("1111");
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
cf1.join();
threadPool1.shutdown();
}
}
- 在第一个 CompletableFuture 中,你使用了
CompletableFuture.supplyAsync
来执行一个任务,并在其中又创建了一个子任务。这个子任务也使用了相同的线程池threadPool1
来执行。 - 在第一个 CompletableFuture 中,你调用了
join()
方法来等待任务的完成。这将阻塞当前线程,直到 CompletableFuture 完成。 - 在第一个 CompletableFuture 中,你的任务返回了另一个 CompletableFuture,然后立即调用了
join()
方法等待它的完成。 - 子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复
package com.xx.cf;
import java.util.concurrent.*;
/**
* @Author: wxz
* @Date: 2023/9/6 20:24
*/
public class CfCyclic {
public static void main(String[] args) {
ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
ExecutorService threadPool2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("1111");
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool2).join();//子任务
}, threadPool1);
cf1.join();
threadPool1.shutdown();
threadPool2.shutdown();
}
}
package com.xx.cf;
import java.util.concurrent.*;
/**
* @Author: wxz
* @Date: 2023/9/6 20:24
*/
public class CfCyclic {
public static void main(String[] args) {
ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
ExecutorService threadPool2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("1111");
// 执行主任务逻辑
return "result";
}, threadPool1).thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println("child");
// 执行子任务逻辑,使用不同的线程池
return "child";
}, threadPool2));
cf1.join();
threadPool1.shutdown();
threadPool2.shutdown();
}
}