博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java8新的异步编程方式 CompletableFuture(二)
阅读量:6684 次
发布时间:2019-06-25

本文共 10782 字,大约阅读时间需要 35 分钟。

,讲述了Future模式的机制、缺点,CompletableFuture产生的由来、静态工厂方法、complete()方法等等。

本文将继续整理CompletableFuture的特性。

3.3 转换

我们可以通过CompletableFuture来异步获取一组数据,并对数据进行一些转换,类似RxJava、Scala的map、flatMap操作。

3.3.1 map

方法名 描述
thenApply(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture
thenApplyAsync(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用ForkJoinPool
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用指定的线程池

thenApply的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>。

CompletableFuture
future = CompletableFuture.supplyAsync(() -> "Hello"); future = future.thenApply(new Function
() { @Override public String apply(String s) { return s + " World"; } }).thenApply(new Function
() { @Override public String apply(String s) { return s.toUpperCase(); } }); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

再用lambda表达式简化一下

CompletableFuture
future = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World").thenApply(String::toUpperCase); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

HELLO WORLD复制代码

下面的例子,展示了数据流的类型经历了如下的转换:String -> Integer -> Double。

CompletableFuture
future = CompletableFuture.supplyAsync(() -> "10") .thenApply(Integer::parseInt) .thenApply(i->i*10.0); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

100.0复制代码

3.3.2 flatMap

方法名 描述
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用ForkJoinPool。
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用指定的线程池。

thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

CompletableFuture
future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

Hello World复制代码

下面的例子展示了多次调用thenCompose()

CompletableFuture
future = CompletableFuture.supplyAsync(() -> "100") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100")) .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s))); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

100100.0复制代码

3.4 组合

方法名 描述
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。

现在有CompletableFuture<T>、CompletableFuture<U>和一个函数(T,U)->V,thenCompose就是将CompletableFuture<T>和CompletableFuture<U>变为CompletableFuture<V>。

CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture
future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i)); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

100100.0复制代码

使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。

thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture类型。

方法名 描述
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。
CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture
future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i))); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

100100.0复制代码

3.5 计算结果完成时的处理

当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。

###3.5.1 执行特定的Action

方法名 描述
whenComplete(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用ForkJoinPool。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用指定的线程池。
CompletableFuture.supplyAsync(() -> "Hello")                .thenApply(s->s+" World")                .thenApply(s->s+ "\nThis is CompletableFuture demo")                .thenApply(String::toLowerCase)                .whenComplete((result, throwable) -> System.out.println(result));复制代码

执行结果:

hello worldthis is completablefuture demo复制代码

###3.5.2 执行完Action可以做转换

方法名 描述
handle(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用ForkJoinPool。
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用指定的线程池。
CompletableFuture
future = CompletableFuture.supplyAsync(() -> "100") .thenApply(s->s+"100") .handle((s, t) -> s != null ? Double.parseDouble(s) : 0); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }复制代码

执行结果:

100100.0复制代码

在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。

@FunctionalInterfacepublic interface BiFunction
{ /** * Applies this function to the given arguments. * * @param t the first function argument * @param u the second function argument * @return the function result */ R apply(T t, U u); /** * Returns a composed function that first applies this function to * its input, and then applies the {
@code after} function to the result. * If evaluation of either function throws an exception, it is relayed to * the caller of the composed function. * * @param
the type of output of the {
@code after} function, and of the * composed function * @param after the function to apply after this function is applied * @return a composed function that first applies this function and then * applies the {
@code after} function * @throws NullPointerException if after is null */ default
BiFunction
andThen(Function
after) { Objects.requireNonNull(after); return (T t, U u) -> after.apply(apply(t, u)); }}复制代码

而whenComplete()的参数是BiConsumer,accept()方法返回void。

@FunctionalInterfacepublic interface BiConsumer
{ /** * Performs this operation on the given arguments. * * @param t the first input argument * @param u the second input argument */ void accept(T t, U u); /** * Returns a composed {
@code BiConsumer} that performs, in sequence, this * operation followed by the {
@code after} operation. If performing either * operation throws an exception, it is relayed to the caller of the * composed operation. If performing this operation throws an exception, * the {
@code after} operation will not be performed. * * @param after the operation to perform after this operation * @return a composed {
@code BiConsumer} that performs in sequence this * operation followed by the {
@code after} operation * @throws NullPointerException if {
@code after} is null */ default BiConsumer
andThen(BiConsumer
after) { Objects.requireNonNull(after); return (l, r) -> { accept(l, r); after.accept(l, r); }; }}复制代码

所以,handle()相当于whenComplete()+转换。

###3.5.3 纯消费(执行Action)

方法名 描述
thenAccept(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值
thenAcceptAsync(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值,使用ForkJoinPool。
thenAcceptAsync(Consumer<? super T> action, Executor executor) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值

thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。

CompletableFuture.supplyAsync(() -> "Hello")                .thenApply(s->s+" World")                .thenApply(s->s+ "\nThis is CompletableFuture demo")                .thenApply(String::toLowerCase)                .thenAccept(System.out::print);复制代码

执行结果:

hello worldthis is completablefuture demo复制代码

转载地址:http://fdaao.baihongyu.com/

你可能感兴趣的文章
当iPhone不再流行 Android它将如何面对未来?
查看>>
web前端浅谈,htmlcss脱离标准文档流相关
查看>>
Springmvc+mybatis+shiro+Dubbo+ZooKeeper+Redis
查看>>
免费的容器架构可视化工具 | 阿里云应用高可用服务 AHAS 发布重大新特性
查看>>
随着加密货币市场稳定 比特币价格不可避免的会下降
查看>>
跟我学习dubbo-Dubbo管理控制台的安装(3)
查看>>
构建微服务:Spring boot
查看>>
物联网落地三大困境破解
查看>>
设置tomcat 启动参数
查看>>
Canal简介及配置说明
查看>>
mybatis知识点
查看>>
jQuery easyui
查看>>
flex datagrid 数据导出
查看>>
VARCHAR2长度限制
查看>>
rabbitMQ消息队列原理
查看>>
Nagios之安装篇
查看>>
平衡二叉树中第k小的数 Kth Smallest Element in a BST
查看>>
我的友情链接
查看>>
Vyos防火墙功能配置
查看>>
Redhat内核编译
查看>>