行动算子
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,列举一些常用的行动算子API,如表1所示。
表1 常用的行动算子API
行动算子 | 相关说明 |
---|---|
count() | 返回数据集中的元素个数 |
first() | 返回数组的第一个元素 |
take(n) | 以数组的形式返回数组集中的前n个元素 |
reduce(func) | 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素 |
collect() | 以数组的形式返回数据集中的所有元素 |
foreach(func) | 将数据集中的每个元素传递到函数func中运行 |
下面,结合具体的示例对这些行动算子API进行详细讲解。
- count()
count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.count()
res0: Long = 5
上述代码中,第1行代码创建了一个RDD对象,当arrRdd调用count()操作后,返回的结果是5,说明成功获取到了RDD数据集的元素。值得一提的是,可以将第一行代码分解成下面两行代码,具体如下:
val arr = Array(1,2,3,4,5)
val arrRdd = sc.parallelize(arr)
- first()
first()主要用于返回数组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.first()
res1: Int = 1
从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。
- take(n)
take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.take(3)
res2: Array[Int]=Array(1,2,3)
从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。
- reduce(func)
reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15
在上述代码中,执行arrRdd.reduce((a,b)=>a+b)操作后返回的结果是15,说明成功的将RDD数据集中的所有元素进行求和,结果为15。
- collect()
collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.collect()
res4: Array[Int] = Array(1,2,3,4,5)
在上述代码中,执行arrRdd.collect()操作后返回的结果是Array(1,2,3,4,5),说明成功的将RDD数据集中的元素以数组的形式输出。
- foreach(func)
foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
scala> arrRdd.foreach(x => println(x))
1
2
3
4
5
在上述代码中,foreach(x => println(x))的含义是依次遍历arrRdd中的每一个元素,把当前遍历的元素赋值给变量x,并且通过println(x)打印出x的值。执行arrRdd.foreach()操作后,arrRdd中的元素被依次输出了(即RDD数据集中所有的元素被遍历输出)。这里的arrRdd.foreach(x => println(x))可以简写为arrRdd.foreach(println)。