首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用流执行聚合(1)

使用流执行聚合(1)

“累加器反模式”第 1 部分中的第 1 个例子使用 Streams 执行了一次简单的汇总,如清单 1 所示。
清单 1. 使用 Streams                声明性地计算聚合值
1
2
3
4
5
int totalSalesFromNY
    = txns.stream()
          .filter(t -> t.getSeller().getAddr().getState().equals("NY"))
          .mapToInt(t -> t.getAmount())
          .sum();




清单 2 展示了如何采用 “老方法” 编写这个示例。
清单 2.                通过命令计算同一个聚合值
1
2
3
4
5
int sum = 0;
for (Txn t : txns) {
    if (t.getSeller().getAddr().getState().equals("NY"))
        sum += t.getAmount();
}




第 1 部分介绍了尽管新方法比老方法更长,但新方法更可取的一些原因:
关于本系列借助 java.util.stream 包,您可以简明地、声明性地表达集合、数组和其他数据源上可能的并行批量操作。在                    Java 语言架构师 Brian Goetz 编写的这个  中,全面了解 Streams 库,并学习如何最充分地使用它。

  • 代码更加清晰,因为它被简单地构造为一些简单操作的组合。
  • 该代码是通过声明进行表达的(描述想要的结果),而不是通过命令进行表达的(一个计算结果的循序渐进的过程)。
  • 随着表达的查询变得更加复杂,此方法可以更干净地扩展。
应用这个特殊的聚合是有一些额外原因的。                演示了累加器反模式,其中代码首先声明并初始化一个可变累加器变量                (sum),然后继续在循环中更新累加器。为什么这样做是不正确的?首先,此代码样式难以并行化。没有协调(比如同步),对累加器的每次访问都导致一次数据争用(而借助协调,协调导致的争用会破坏并行性所带来的效率收益)。
累加器方法更不可取的另一个原因是,它在太低的级别上建模计算 — 在各个元素的级别上,而不是在整个数据集的级别上。与                “逐个依次迭代交易金额,将每笔金额添加到一个已初始化为 0 的累加器” 相比,“所有交易金额的总和” 是目标的更抽象、更直接的陈述。
所以,如果命令式累加是错误的工具,那什么才是正确的工具?在这个特定的问题中,您已经看到了答案的线索(sum()                    方法),但这只是一个强大的、通用的缩减                技术的一种特殊情况。缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。
缩减
“缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。”

缩减(也称为折叠)是一种来自函数编程的技术,它抽象化了许多不同的累加操作。给定一个类型为 T,包含 x 个元素的非空数列                X1, x2, ..., xn 和 T 上的二元运算符(在这里表示为 *),*                下的 X 的缩减 被定义为:
   (x1 * x2 * ...*                    xn)
当使用普通的加法作为二元运算符来应用于某个数列时,缩减就是求和。但其他许多操作也可以使用缩减来描述。如果二元运算符是                “获取两个元素中较大的一个”(这在 Java 中可以使用拉姆达表达式 (x,y) -> Math.max(x,y)                来表示,或者更简单地表示为方法引用 Math::max),则缩减对应于查找最大值。
通过将累加描述为缩减,而不使用累加器反模式,可以采用更抽象、更紧凑、更并行化                的方式来描述计算 — 只要您的二元运算符满足一个简单条件:结合性。回想一下,如果 a、b 和 c                元素满足以下条件,二元运算符 * 就是结合性的
   ((a * b) * c) = (a * (b *                c))
结合性意味着分组无关紧要。如果二元运算符是结合性的,那么可以按照任何顺序安全地执行缩减。在顺序执行中,执行的自然顺序是从左向右;在并行执行中,数据划分为分段,分别缩减每个分段,然后组合结果。结合性可确保这两种方法得到相同的答案。如果将结合性的定义扩展到                4 项,可能更容易理解:
   (((a * b) * c) * d) = ((a * b) * (c *                    d))
左侧对应于典型的顺序计算;右侧对应于表示典型的并行执行的分区执行,其中输入序列被分解为几部分,各部分并行缩减,并使用 *                将各部分的结果组合起来。(或许令人惊奇的是,*                不需要是可交换的,但许多运算符通常都可用于缩减,比如相加和求最大值等。具有结合性但没有可交换性的二元运算符的一个例子是字符串串联。)
Streams 库有多种缩减方法,包括:
1
2
Optional<T> reduce(BinaryOperator<T> op)
T reduce(T identity, BinaryOperator<T> op)




在这些方法中,最简单的方法仅获得一个结合性二元运算符,在该运算符下计算流元素的缩减结果。结果被描述为                Optional;如果输入流是空的,则缩减结果也是空的。(如果输入只有一个元素,那么缩减结果就是该元素。)如果您有一个字符串集合,您可以将这些元素的串联计算为:
1
String concatenated = strings.stream().reduce("", String::concat);




对于这两种方法中的第二种方法,您需要提供一个身份值,在字符串为空时,还可以将该值用作结果。身份值必须满足所有 x 的限制:
   身份 * x = x
   x *                    身份 = x
不是所有二元运算符都有身份值,但当它们拥有身份值时,它们可能不会得到您想要的结果。例如,计算最大值时,您可能倾向于使用值                Integer.MIN_VALUE                作为身份(它确实满足要求)。但在空流上使用该身份时,结果可能不是您想要的;您无法确定空输入和仅包含                Integer.MIN_VALUE 的非空输入之间的区别。(有时这不是问题,但有时会导致问题 — 因此                Streams 库将留给客户,由客户决定是否指定身份。)
对于字符串串联,身份是空字符串,所以您可以将前面的示例重写为:
1
String concatenated = strings.stream().reduce("", String::concat);




类似地,您可以将数组上的整数总和描述为:
1
int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);




(但实际上,您使用了 IntStream.sum() 便捷方法。)
缩减不需要仅应用于整数和字符串,它可以应用于您想要将元素序列缩减为该类型的单个元素的任何情形。例如,您可以通过缩减来计算最高的人:
1
2
3
Comparator<Person> byHeight = Comparators.comparingInt(Person::getHeight);
BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight);
Optional<Person> tallest = people.stream().reduce(tallerOf);




如果提供的二元运算符不是结合性的,或者提供的身份值实际上不是该二元运算符的身份,那么在并行执行该操作时,结果可能是错的,而且同一个数据集上的不同执行过程可能会生成不同的结果。
返回列表