学科分类
目录
Spark技术栈

RDD的分区

在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。

RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism这个参数值来配置默认的分区数目。一般而言,各种模式下的默认分区数目如下:

  • Local模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N。
  • Standalone或者Yarn模式:在“集群中所有CPU核数总和”和“2”这两者中取较大值作为默认值。
  • Mesos模式:默认的分区数是8。

Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。需要注意的是,RDD的分区函数是针对(Key,Value)类型的RDD,分区函数根据Key对RDD元素进行分区。因此,当需要对一些非(Key,Value)类型的RDD进行自定义分区时,需要先把RDD元素转换为(Key,Value)类型,再通过分区函数进行分区操作。

如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org.apache.spark.Partitioner类,并实现其中的3个方法,具体如下:

  1. def numPartitions:Int:用于返回创建的分区个数。

  2. def getPartition(Key:Any):用于对输入的Key做处理,并返回该Key的分区ID,分区ID的范围是0~numPartitions-1。

  3. equals(other:Any):用于Spark判断自定义的Partitioner对象和其他的Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。其中,equals()方法中的参数other表示其他的Partitioner对象,该方法的返回值是一个Boolean类型,当返回值为true时表示自定义的Partitioner对象和其他Partitioner对象相同,则两个RDD的分区方式也是相同的;反之,自定义的Partitioner对象和其他Partitioner对象不相同,则两个RDD的分区方式也不相同。

点击此处
隐藏目录