学科分类
目录
Spark技术栈

行动算子

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,列举一些常用的行动算子API,如表1所示。

表1 常用的行动算子API

行动算子 相关说明
count() 返回数据集中的元素个数
first() 返回数组的第一个元素
take(n) 以数组的形式返回数组集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
collect() 以数组的形式返回数据集中的所有元素
foreach(func) 将数据集中的每个元素传递到函数func中运行

下面,结合具体的示例对这些行动算子API进行详细讲解。

  • count()

count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:

  scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.count()
  res0: Long = 5

上述代码中,第1行代码创建了一个RDD对象,当arrRdd调用count()操作后,返回的结果是5,说明成功获取到了RDD数据集的元素。值得一提的是,可以将第一行代码分解成下面两行代码,具体如下:

val arr = Array(12345)
val arrRdd = sc.parallelize(arr)
  • first()

first()主要用于返回数组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:

  scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.first()
  res1: Int = 1

从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。

  • take(n)

take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize  at <console>:24
  scala> arrRdd.take(3)
  res2: Array[Int]=Array(1,2,3)

从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。

  • reduce(func)

reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.reduce((a,b)=>a+b)
  res3: Int = 15

在上述代码中,执行arrRdd.reduce((a,b)=>a+b)操作后返回的结果是15,说明成功的将RDD数据集中的所有元素进行求和,结果为15。

  • collect()

collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:

  scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.collect()
  res4: Array[Int] = Array(1,2,3,4,5)

在上述代码中,执行arrRdd.collect()操作后返回的结果是Array(1,2,3,4,5),说明成功的将RDD数据集中的元素以数组的形式输出。

  • foreach(func)

foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.foreach(x => println(x))
  1
  2
  3
  4
  5

在上述代码中,foreach(x => println(x))的含义是依次遍历arrRdd中的每一个元素,把当前遍历的元素赋值给变量x,并且通过println(x)打印出x的值。执行arrRdd.foreach()操作后,arrRdd中的元素被依次输出了(即RDD数据集中所有的元素被遍历输出)。这里的arrRdd.foreach(x => println(x))可以简写为arrRdd.foreach(println)。

点击此处
隐藏目录