CompositeFuture 是一种特殊的 Future,它可以包装一个 Future 列表,从而让一组异步操作并行执行;然后协调这一组操作的结果,作为 CompositeFuture 的结果。本文将介绍 CompositeFuture 的用法以及几种协调方式,掌握这些内容有助于解决多个 Future 的整合问题。
开发人员从两个方面关注一个 Future:状态和结果。CompositeFuture 是 Future,同样要从这两个方面关注它。
CompositeFuture 的结果(消息)类型是 CompositeFuture 本身,也就是说 CompositeFuture#result()
返回值类型固定是 CompositeFuture。它可以用来表示一组结果,可以通过 list()
或 causes()
一次性读取一组结果或异常,也可以通过resultAt(index)
或cause(index)
读取单个结果或异常。
CompositeFuture cf = CompositeFuture.all(future1, future2); cf.onSuccess(res -> { CompositeFuture result = cf.result(); // 它的结果也是 CompositeFuture 类型 List<Double> results = result.list(); // 一次性读取一组结果 List<Throwable> cause = result.causes(); // 一次性读取一组异常 Double r1 = result.resultAt(0); // 读取第 0 个 Future 的结果 Throwbale t1 = result.cause(0); // 读取第 0 个 Future 的异常 });
CompositeFuture 的状态随着列表中 Future 状态的变化而变化。例如:当列表中所有 Future 都成功完成时,CompositeFuture 才是成功完成;或者,当列表中只要有一个 Future 成功完成时,CompositeFuture 就是成功完成。
CompositeFuture 作为 Future 协调器,提供了 all, any, join 三种协调方式。所谓协调,就是把一组 Future 包装成一个新的 Future,即 CompositeFuture,它的状态随着所包装列表中 Future 状态的变化而变化。
下面通过几种场景示例详细介绍这几种协调方式。
假如有一个金融系统,它有一个功能是统计某个客户的银行存款总额,需要向各个不同的银行系统发送余额查询请求,然后把这些请求的返回结果相加作为最终结果。一种高效的方式是并行发送请求,然后在所有请求都有结果时将结果汇总。使用 CompositeFuture 的 all 操作可以很方便地达到这个目的。
Future<Double> f1 = queryBalance("bank_a", "Robothy"); Future<Double> f2 = queryBalance("bank_b", "Robothy"); CompositeFuture future = CompositeFuture.all(f1, f2); future.map(results -> { // results 是一个 CompositeFuture Double balance1 = results.resultAt(0); Double balance2 = results.resultAt(1); return balance1 + balance2; }).onSuccess(totalBalance -> System.out.println("Total balance is " + totalBalance)) .onFailure(Throwable::printStackTrace);
其中 queryBalance 的方法签名:
Future<Double> queryBalance(String bank, String username);
queryBalance 返回的是 Future,CompositeFuture#all()
把两个 Future 包装了起来,返回一个 CompositeFuture,当两个 Future 都成功完成时,这个 CompositeFuture 才算成功完成;随后转换操作 map 所设置的同步函数得以执行,通过 resultAt 方法读取结果,并将结果相加,作为总余额返回。此外,两个 queryBalance 并行执行。
上面这个例子只从 2 个银行获取余额,实际上,每个客户开会行的数量是不定的,一个客户通常对应着一个银行列表。对于这种情况,可以将多个 Future 放到一个 List 当中,再通过 CompositeFuture#all(List<Future> futures)
方法对一个 Future 列表进行包装。
List<String> banks = Arrays.asList("bank1", "bank2", "bank3", ...); List<Future> futureList = banks.stream().map(bank -> queryBalance(bank, "Robothy")) .collect(Collectors.toList()); CompositeFuture.all(futureList) .map(results -> { Double totalBalance = 0.0; List<Double> balanceList = results.list(); for (Double balance : balanceList) { totalBalance += balance; } return totalBalance; }) .onSuccess(totalBalance -> System.out.println("Total balance is " + totalBalance)) .onFailure(Throwable::printStackTrace);
上面代码中,客户 "Robothy" 对应着一个银行列表,通过流操作把银行列表转化为 List<Future> futureList
。随后通过 CompositeFuture#all
对这个 Future 列表进行包装,当列表中的 Future 全部成功完成时,all 返回的 CompositeFuture 才算成功完成。在读取结果的时候,这里使用了 list()
方法一次性读取所有 Future 的结果。
CompositeFuture 不仅能够协调结果类型相同的多个 Future,还可以协调结果类型不同的 Future。例如:下面例子以并行的方式获取用户信息和账户信息。
Future<User> f1 = getUserByName("Robothy"); Future<Account> f2 = getAccountByName("Robothy"); CompositeFuture.all(f1, f2) .onSuccess(results -> { User user = resuts.resultAt(0); Account account = results.resultAt(1); }) .onFailure(Throwable::printStackTrace);
通过 CompositeFuture#all
包装得到 Future 列表都已成功完成时,all()
返回的 CompositeFuture 才算完成;当其中一个 Future 失败时,CompositeFuture 立即失败。
join 是另一种协调方式,当它包装的所有 Future 都已成功完成时,join()
返回的 CompositeFuture 才算成功完成。当其中一个 Future 失败,CompositeFuture 在等待所有 Future 都(成功或失败)完成之后失败。
下面这个例子的目的是找到网络延迟最小的服务器。
List<String> hosts = Arrays.asList("10.0.1.123", "10.0.1.124", "10.0.1.125", ...); // 异步函数 ping 将 host 映射为 Future<Long> List<Future> futures = hosts.stream().map(host -> ping(host)).collect(Collectors.toList()); CompositeFuture cf = CompositeFuture.join(futures); cf.onComplete(res -> { // 这里是 onComplete,无论 cf 成功还是失败,都将执行。 CompositeFuture delays = res.result(); Long minDelay = Long.MAX_VALUE; int minDelayIdx = -1; for (int i=0; i<delays.size(); i++) { Long delay = delays.resultAt(i); if (null != delay) { // 第 i 个 Future 成功完成(如果失败, delay 为 null) if (delay < minDelay) { minDelay = delay; minDelayIdx = i; } } } if (minDelayIdx != -1) { System.out.println("Min delay: " + minDelay + ", server: " + hosts.get(minDelayIdx)); } else { System.out.println("All servers are unreachable."); } }) .onFailure(Throwable::printStackTrace);
join 和 all 的区别在于:当包装的 Future 列表中有 1 个失败时,all()
得到的 CompositeFuture 立即失败,而join()
所包装的 CompositeFuture 会等待列表中的所有 Future 都完成时才失败。
此外,CompositeFuture 包装了多个 Future,意味着可能会有多个失败的 Future,而 Future#onFailure
只能够处理一个异常对象。这个异常对象是失败 Future 中,索引号最小的 Throwable 对象,并非最先失败的 Future 对应的 Throwable。例如下面这个例子总是输出 "f1 error"。
CompositeFuture.join( Future.future(promise -> promise.fail(new RuntimeException("f1 error"))), Future.future(promise -> promise.fail(new RuntimeException("f2 error"))) ) .onFailure(cause -> System.err.println(cause.getMessage())); // 总是输出 f1 error }
要处理每个异常对象,需要在 onComplete 设置的处理器中进行操作。
CompositeFuture.join(f1, f2, ...) .onComplete(res -> { CompositeFuture results = res.result(); for (int i=0; i<results.size(); i++) { Throwbale cause = results.cause(i); if (null != cause) { // 第 i 个 Future 失败了 cause.printStackTrace(); } } });
all 和 join 都需要在列表中所有的 Future 都成功的情况下,CompositeFuture 才算成功,而 any 只要有一个 Future 成功了,CompositeFuture 就会立即成功完成;当所有 Future 都失败了,CompositeFuture 才是失败。
下面这个例子表示客户端向一个分布式系统的多个副本节点发送相同的消息,只要有一个节点返回成功,则表示消息发送成功。
String msg = "Hello"; List<String> hosts = Arrays.asList("10.0.0.1", "10.0.0.2", ...); List<Future> futures = hosts.stream().map(host -> send(host, msg)).collect(Collectors.toList()); CompositeFuture.any(futures) .onSuccess(results -> { for (int i=0; i<results.size(); i++) { if (null != results.resultAt(i)) { System.out.println(hosts.get(i) + " received message."); } } }) .onFailure(Throwable::printStackTrace);
CompositeFuture 作为 Future 的子接口,和普通 Future 一样可以处理消息和转化为另一个 Future 的能力。特殊的是,它的消息类型也是 CompositeFuture,它的状态随着所包装的若干 Future 状态的变化而变化。
CompositFuture 包装 Future 的方式或者说协调方式有三种:all, join, any。需要根据不同的应用场景来选择不同的包装方式。