Java 8来了,是时候学一下新的东西了。Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进。或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture。众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的。CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步于旧版本的Java中。如果你打开JavaDoc of CompletableFuture你一定会感到震惊。大约有五十种方法(!),而且它们中的一些非常有意思而且不好理解,例如:
public <U,V> CompletableFuture<V> thenCombineAsync( CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
一、提取、修改包装的值
通常futures代表其它线程中运行的代码,但事实并非总是如此。有时你想要创造一个Future来表示你知道将会发生什么,例如JMS message arrival。所以你有Future但是未来并没有潜在的异步工作。你只是想在未来JMS消息到达时简单地完成(解决),这是由一个事件驱动的。在这种情况下,你可以简单地创建CompletableFuture来返还给你的客户端,只要你认为你的结果是可用的,仅仅通过complete()就能解锁所有等待Future的客户端。
首先你可以简单地创建新的CompletableFuture并且给你的客户端:
public CompletableFuture<String> ask() { final CompletableFuture<String> future = new CompletableFuture<>(); //... return future; }
future.complete("42")
有时你想要看到信号发生故障的情况,如你所知Future对象可以处理它所包含的结果或异常。如果你想进一步传递一些异常,可以用CompletableFuture.completeExceptionally(ex) (或者用obtrudeException(ex)这样更强大的方法覆盖前面的异常)。 completeExceptionally()也能解锁所有等待的客户端,但这一次从get()抛出异常。说到get(),也有CompletableFuture.join()方法在错误处理方面有着细微的变动。但总体上,它们都是一样的。最后也有CompletableFuture.getNow(valueIfAbsent)方法没有阻塞但是如果Future还没完成将返回默认值,这使得当构建那种我们不想等太久的健壮系统时非常有用。
最后static的方法是用completedFuture(value)来返回已经完成Future的对象,当测试或者写一些适配器层时可能非常有用。
二、创造和获取CompletableFuture
好了,那么手动地创建CompletableFuture是我们唯一的选择吗?不一定。就像一般的Futures,我们可以关联存在的任务,同时CompletableFuture使用工厂方法:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
无参方法Executor是以…Async结尾同时将会使用ForkJoinPool.commonPool()(全局的,在JDK8中介绍的通用池),这适用于CompletableFuture类中的大多数的方法。runAsync()易于理解,注意它需要Runnable,因此它返回CompletableFuture<Void>作为Runnable不返回任何值。如果你需要处理异步操作并返回结果,使用Supplier<U>:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { //...long running... return "42"; } }, executor);
但是别忘了,Java 8里面还有lambdas表达式呢!
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //...long running... return "42"; }, executor);
或者:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
虽然这篇文章不是关于Lambda的,但是我会相当频繁地使用lambda表达式。
三、转换和作用于CompletableFuture(thenApply)
我说过CompletableFuture优于Future但是你还不知道为什么吗?简单说,因为CompletableFuture是一个原子也是一个因子。我说的这句话没什么帮助吗?Scala和JavaScript都允许future完成时允许注册异步回调,直到它准备好我们才要等待和阻止它。我们可以简单地说:运行这个函数时就出现了结果。此外,我们可以叠加这些功能,把多个future组合在一起等。例如如果我们从String转为Integer,我们可以转为在不关联的前提下从CompletableFuture到 CompletableFuture<Integer。这是通过thenApply()的方法:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);<p></p> <p>如前所述...Async版本提供对CompletableFuture的大多数操作,因此我将在后面的部分中跳过它们。记住,第一个方法将在future完成的相同线程中调用该方法,而剩下的两个将在不同的线程池中异步地调用它。 让我们来看看thenApply()的工作流程:</p> <p><pre> CompletableFuture<String> f1 = //... CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt); CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI); </p>
或在一个声明中:
CompletableFuture<Double> f3 = f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
这里,你会看到一个序列的转换,从String到Integer再到Double。但最重要的是,这些转换既不立即执行也不停止。这些转换既不立即执行也不停止。他们只是记得,当原始f1完成他们所执行的程序。如果某些转换非常耗时,你可以提供你自己的Executor来异步地运行他们。注意,此操作相当于Scala中的一元map。
四、运行完成的代码(thenAccept/thenRun)
CompletableFuture<Void> thenAccept(Consumer<? super T> block); CompletableFuture<Void> thenRun(Runnable action);
在future的管道里有两种典型的“最终”阶段方法。他们在你使用future的值的时候做好准备,当 thenAccept()提供最终的值时,thenRun执行 Runnable,这甚至没有方法去计算值。例如:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor); log.debug("Continuing");
五、单个CompletableFuture的错误处理
到目前为止,我们只讨论计算的结果。那么异常呢?我们可以异步地处理它们吗?当然!
CompletableFuture<String> safe = future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
CompletableFuture<Integer> safe = future.handle((ok, ex) -> { if (ok != null) { return Integer.parseInt(ok); } else { log.warn("Problem", ex); return -1; } });
六、一起结合两个CompletableFuture
异步处理过程之一的CompletableFuture非常不错但是当多个这样的futures以各种方式组合在一起时确实显示了它的强大。
七、结合(链接)这两个futures(thenCompose())
有时你想运行一些future的值(当它准备好了),但这个函数也返回了future。CompletableFuture足够灵活地明白我们的函数结果现在应该作为顶级的future,对比CompletableFuture<CompletableFuture>。方法 thenCompose()相当于Scala的flatMap:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
CompletableFuture<Document> docFuture = //... CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance); CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance); //... private CompletableFuture<Double> calculateRelevance(Document doc) //...
thenCompose()是一个重要的方法允许构建健壮的和异步的管道,没有阻塞和等待的中间步骤。
八、两个futures的转换值(thenCombine())
当thenCompose()用于链接一个future时依赖另一个thenCombine,当他们都完成之后就结合两个独立的futures:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); CompletableFuture<Shop> shopFuture = closestShop(); CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop)); //... private Route findRoute(Customer customer, Shop shop) //...
customerFuture.thenCombine(shopFuture, this::findRoute);
你也知道,我们有customerFuture 和 shopFuture。那么routeFuture包装它们然后“等待”它们完成。当他们准备好了,它会运行我们提供的函数来结合所有的结果(findRoute())。当两个基本的futures完成并且 findRoute()也完成时,这样routeFuture将会完成。
九、等待所有的 CompletableFutures 完成
如果不是产生新的CompletableFuture连接这两个结果,我们只是希望当完成时得到通知,我们可以使用thenAcceptBoth()/runAfterBoth()系列的方法,(…Async 变量也是可用的)。它们的工作方式与thenAccept() 和 thenRun()类似,但是是等待两个futures而不是一个:
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block) CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
想象一下上面的例子,这不是产生新的 CompletableFuture,你只是想要立刻发送一些事件或刷新GUI。这可以很容易地实现:thenAcceptBoth():
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> { final Route route = findRoute(cust, shop); //refresh GUI with route });
我希望我是错的,但也许有些人会问自己一个问题:为什么我不能简单地阻塞这两个futures呢? 就像:
Future<Customer> customerFuture = loadCustomerDetails(123); Future<Shop> shopFuture = closestShop(); findRoute(customerFuture.get(), shopFuture.get());
好了,你当然可以这么做。但是最关键的一点是CompletableFuture是允许异步的,它是事件驱动的编程模型而不是阻塞并急切地等待着结果。所以在功能上,上面两部分代码是等价的,但后者没有必要占用一个线程来执行。
十、等待第一个 CompletableFuture 来完成任务
另一个有趣的事是CompletableFutureAPI可以等待第一个(与所有相反)完成的future。当你有两个相同类型任务的结果时就显得非常方便,你只要关心响应时间就行了,没有哪个任务是优先的。API方法(…Async变量也是可用的):
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block) CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
作为一个例子,你有两个系统可以集成。一个具有较小的平均响应时间但是拥有高的标准差,另一个一般情况下较慢,但是更加容易预测。为了两全其美(性能和可预测性)你可以在同一时间调用两个系统并等着谁先完成。通常这会是第一个系统,但是在进度变得缓慢时,第二个系统就可以在可接受的时间内完成:
CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); fast.acceptEither(predictable, s -> { System.out.println("Result: " + s); });
s代表了从fetchFast()或是fetchPredictably()得到的String。我们不必知道也无需关心。
十一、完整地转换第一个系统
applyToEither()算是 acceptEither()的前辈了。当两个futures快要完成时,后者只是简单地调用一些代码片段,applyToEither()将会返回一个新的future。当这两个最初的futures完成时,新的future也会完成。API有点类似于(…Async 变量也是可用的):
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)
CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); CompletableFuture<String> firstDone = fast.applyToEither(predictable, Function.<String>identity());
十二、多种结合的CompletableFuture
我们现在知道如何等待两个future来完成(使用thenCombine())并第一个完成(applyToEither())。但它可以扩展到任意数量的futures吗?的确,使用static辅助方法:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs) static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
总结
我们探索了整个CompletableFuture API。我确信这样就能战无不胜了,所以在下一篇文章中我们将研究另一个简单的web爬虫程序的实现,使用CompletableFuture方法和Java 8 lambda表达式,我们也会看看CompletableFuture的
本文向大家介绍awk 详解。相关面试题,主要包含被问及awk 详解。时的应答技巧和注意事项,需要的朋友参考一下 答案: awk '{pattern + action}' { filenames } #cat /etc/passwd |awk -F ':' '{print 1"t"7}' //-F 的意思是以':'分隔 root /bin/bash daemon /bin/sh 搜索/etc/pas
glob 是由普通字符和/或通配字符组成的字符串,用于匹配文件路径。可以利用一个或多个 glob 在文件系统中定位文件。 src() 方法接受一个 glob 字符串或由多个 glob 字符串组成的数组作为参数,用于确定哪些文件需要被操作。glob 或 glob 数组必须至少匹配到一个匹配项,否则 src() 将报错。当使用 glob 数组时,将按照每个 glob 在数组中的位置依次执行匹配 - 这
scanf()函数详解 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> //01.scanf();函数扫描输入事项: // 格式必须一一匹配:非格式控制符的可见字符必须一一匹配输入 int main01(void) { int num = 0; printf("%p \n", &nu
printf()详解 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> //01.printf();&sprintf();&fprintf();格式控制字符串详解: // (1).格式控制字符串的组成: // 普通字符串+格式控制符 // (2).常见的格式控制字符: // %f-
主要内容:state状态,自定义资源共享方式,源码实现AQS是AbstractQueuedSynchronizer的简称。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,如下图所示。AQS为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。子类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的。鉴于此,本类中的其他方法执行所有的排队和阻塞机制。子类
主要内容:state状态,自定义资源共享方式,源码实现AQS是AbstractQueuedSynchronizer的简称。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,如下图所示。AQS为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。子类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的。鉴于此,本类中的其他方法执行所有的排队和阻塞机制。子类
主要内容:6 DataNode(面试开发重点)6 DataNode(面试开发重点) 6.1 DataNode工作机制 DataNode工作机制,如图3-15所示。 1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。 2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。 3)心跳是每3秒一次
主要内容:4 HDFS的数据流,5 NameNode和SecondaryNameNode(面试开发重点)4 HDFS的数据流 4.1 HDFS写数据流程 4.1.1 剖析文件写入 HDFS写数据流程 1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。 2)NameNode返回是否可以上传。 3)客户端请求第一个 Block上传到哪几个DataNode服务器上。 4)NameNode返回3个Data