学科分类
目录
Spark技术栈

持久化机制

在Spark中,RDD是采用惰性求值,即每次调用行动算子操作,都会从头开始计算。然而,每次调用行动算子操作,都会触发一次从头开始的计算,这对于迭代计算来说,代价是很大的,因为迭代计算经常需要多次重复的使用同一组数据集,所以,为了避免重复计算的开销,可以让Spark对数据集进行持久化。

通常情况下,一个RDD是由多个分区组成的,RDD中的数据分布在多个节点中,因此,当持久化某个RDD时,每一个节点都将把计算分区的结果保存在内存中,若对该RDD或衍生出的RDD进行其他行动算子操作时,则不需要重新计算,直接去取各个分区保存数据即可,这使得后续的行动算子操作速度更快(通常超过10倍),并且缓存是Spark构建迭代式算法和快速交互式查询的关键。

RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。每一个持久化的RDD都可以使用不同的存储级别存储,从而允许持久化数据集在硬盘或者内存作为序列化的Java对象,甚至可以跨节点复制。

persist()方法的存储级别是通过StorageLevel对象(Scala、Java、Python)设置的。

cache()方法的存储级别是使用默认的存储级别(即StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存))。接下来,通过一张表介绍一下持久化RDD的存储级别,如表1所示。

表1 持久化RDD的存储级别

存储级别 相关说明
MEMORY_ONLY 默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算
MEMORY_AND_DISK 将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取
MEMORY_ONLY_SER 将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时,更占CPU
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER类似,但是将当内存放不下则溢出到磁盘,而不是每次需要时重新计算它们
DISK_ONLY 仅将RDD分区全部存储到磁盘上
MEMORY_ONLY_2 MEMORY_AND_DISK_2 与上面的级别相同。若加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上
OFF_HEAP(实验性) 与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中(这需要启用堆外内存)

在表1中,列举了持久化RDD的存储级别,我们可以在RDD进行第一次算子操作时,根据自己的需求选择对应的存储级别。

为了大家更好地理解,接下来,通过代码演示如何使用persist()方法和cache()方法对RDD进行持久化。

  1. 使用persist()方法对RDD进行持久化

定义一个列表list,通过该列表创建一个RDD,然后通过persist持久化操作和算子操作统计RDD中的元素个数以及打印输出RDD中的所有元素。具体代码如下:

 1  scala> import org.apache.spark.storage.StorageLevel
 2  import org.apache.spark.storage.StorageLevel
 3  scala> val list = List("hadoop","spark","hive")
 4  list: List[String] = List(hadoop, spark, hive)
 5  scala> val listRDD = sc.parallelize(list)
 6  listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 7                        parallelize at <console>:27
 8  scala> listRDD.persist(StorageLevel.DISK_ONLY)
 9  res1: listRDD.type = ParallelCollectionRDD[0] at parallelize at <console>:27
 10 scala> println(listRDD.count())
 11 3
 12 scala> println(listRDD.collect().mkString(","))
 13 hadoop,spark,hive

上述代码中,第1行代码导入StorageLevel对象的包;第3行代码定义了一个列表list;第5行代码执行sc.parallelize(list)操作,创建了一个RDD,即listRDD;第8行代码添加了persist()方法,用于持久化RDD,减少I/O操作,提高计算效率;第10行代码执行listRDD.count()行动算子操作,将统计listRDD中元素的个数;第12行代码执行listRDD.collect()行动算子操作和mkString(“,”)操作,将listRDD中的所有元素进行打印输出,并且是以逗号为分隔符。

需要注意的是,当程序执行到第8行代码时,并不会持久化listRDD,因为listRDD还没有被真正计算;当执行第10行代码时,listRDD才会进行第一次的行动算子操作,触发真正的从头到尾的计算,这时listRDD.persist()方法才会被真正的执行,把listRDD持久化到磁盘中;当执行到第12行代码时,进行第二次的行动算子操作,但不触发从头到尾的计算,只需使用已经进行持久化的listRDD来进行计算。

  1. 使用cache()方法对RDD进行持久化

定义一个列表list,通过该列表创建一个RDD,然后通过cache持久化操作和算子操作统计RDD中的元素个数以及打印输出rdd中的所有元素。具体代码如下:

 1  scala> val list= List("hadoop","spark","hive")
 2  list: List[String] = List(hadoop, spark, hive)
 3  scala> val listRDD= sc.parallelize(list)
 4  listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 5                        parallelize at <console>:26
 6  scala> listRDD.cache()
 7  res2: listRDD.type = ParallelCollectionRDD[1] at parallelize at <console>:26
 8  scala> println(listRDD.count())
 9  3
 10 scala> println(listRDD.collect().mkString(","))
 11 hadoop,spark,hive

上述代码中,第6行代码对listRDD进行持久化操作,即添加cache()方法,用于持久化RDD,减少I/O操作,提高计算效率。然而,使用cache()方法进行持久化操作,底层是调用了persist(MEMORY_ONLY)方法,用来对RDD进行持久化。当程序当执行到第6行代码时,并不会持久化listRDD,因为listRDD还没有被真正计算;当程序执行第8行代码时,listRDD才会进行第一次的行动算子操作,触发真正的从头到尾的计算,这时listRDD.cache()方法才会被真正的执行,把listRDD持久化到内存中;当程序执行到第10行代码时,进行第二次的行动算子操作,但不触发从头到尾的计算,只需使用已经持久化的listRDD来进行计算。

点击此处
隐藏目录