持久化机制
在Spark中,RDD是采用惰性求值,即每次调用行动算子操作,都会从头开始计算。然而,每次调用行动算子操作,都会触发一次从头开始的计算,这对于迭代计算来说,代价是很大的,因为迭代计算经常需要多次重复的使用同一组数据集,所以,为了避免重复计算的开销,可以让Spark对数据集进行持久化。
通常情况下,一个RDD是由多个分区组成的,RDD中的数据分布在多个节点中,因此,当持久化某个RDD时,每一个节点都将把计算分区的结果保存在内存中,若对该RDD或衍生出的RDD进行其他行动算子操作时,则不需要重新计算,直接去取各个分区保存数据即可,这使得后续的行动算子操作速度更快(通常超过10倍),并且缓存是Spark构建迭代式算法和快速交互式查询的关键。
RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。每一个持久化的RDD都可以使用不同的存储级别存储,从而允许持久化数据集在硬盘或者内存作为序列化的Java对象,甚至可以跨节点复制。
persist()方法的存储级别是通过StorageLevel对象(Scala、Java、Python)设置的。
cache()方法的存储级别是使用默认的存储级别(即StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存))。接下来,通过一张表介绍一下持久化RDD的存储级别,如表1所示。
表1 持久化RDD的存储级别
存储级别 | 相关说明 |
---|---|
MEMORY_ONLY | 默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取 |
MEMORY_ONLY_SER | 将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时,更占CPU |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,但是将当内存放不下则溢出到磁盘,而不是每次需要时重新计算它们 |
DISK_ONLY | 仅将RDD分区全部存储到磁盘上 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 | 与上面的级别相同。若加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上 |
OFF_HEAP(实验性) | 与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中(这需要启用堆外内存) |
在表1中,列举了持久化RDD的存储级别,我们可以在RDD进行第一次算子操作时,根据自己的需求选择对应的存储级别。
为了大家更好地理解,接下来,通过代码演示如何使用persist()方法和cache()方法对RDD进行持久化。
- 使用persist()方法对RDD进行持久化
定义一个列表list,通过该列表创建一个RDD,然后通过persist持久化操作和算子操作统计RDD中的元素个数以及打印输出RDD中的所有元素。具体代码如下:
1 scala> import org.apache.spark.storage.StorageLevel
2 import org.apache.spark.storage.StorageLevel
3 scala> val list = List("hadoop","spark","hive")
4 list: List[String] = List(hadoop, spark, hive)
5 scala> val listRDD = sc.parallelize(list)
6 listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
7 parallelize at <console>:27
8 scala> listRDD.persist(StorageLevel.DISK_ONLY)
9 res1: listRDD.type = ParallelCollectionRDD[0] at parallelize at <console>:27
10 scala> println(listRDD.count())
11 3
12 scala> println(listRDD.collect().mkString(","))
13 hadoop,spark,hive
上述代码中,第1行代码导入StorageLevel对象的包;第3行代码定义了一个列表list;第5行代码执行sc.parallelize(list)操作,创建了一个RDD,即listRDD;第8行代码添加了persist()方法,用于持久化RDD,减少I/O操作,提高计算效率;第10行代码执行listRDD.count()行动算子操作,将统计listRDD中元素的个数;第12行代码执行listRDD.collect()行动算子操作和mkString(“,”)操作,将listRDD中的所有元素进行打印输出,并且是以逗号为分隔符。
需要注意的是,当程序执行到第8行代码时,并不会持久化listRDD,因为listRDD还没有被真正计算;当执行第10行代码时,listRDD才会进行第一次的行动算子操作,触发真正的从头到尾的计算,这时listRDD.persist()方法才会被真正的执行,把listRDD持久化到磁盘中;当执行到第12行代码时,进行第二次的行动算子操作,但不触发从头到尾的计算,只需使用已经进行持久化的listRDD来进行计算。
- 使用cache()方法对RDD进行持久化
定义一个列表list,通过该列表创建一个RDD,然后通过cache持久化操作和算子操作统计RDD中的元素个数以及打印输出rdd中的所有元素。具体代码如下:
1 scala> val list= List("hadoop","spark","hive")
2 list: List[String] = List(hadoop, spark, hive)
3 scala> val listRDD= sc.parallelize(list)
4 listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
5 parallelize at <console>:26
6 scala> listRDD.cache()
7 res2: listRDD.type = ParallelCollectionRDD[1] at parallelize at <console>:26
8 scala> println(listRDD.count())
9 3
10 scala> println(listRDD.collect().mkString(","))
11 hadoop,spark,hive
上述代码中,第6行代码对listRDD进行持久化操作,即添加cache()方法,用于持久化RDD,减少I/O操作,提高计算效率。然而,使用cache()方法进行持久化操作,底层是调用了persist(MEMORY_ONLY)方法,用来对RDD进行持久化。当程序当执行到第6行代码时,并不会持久化listRDD,因为listRDD还没有被真正计算;当程序执行第8行代码时,listRDD才会进行第一次的行动算子操作,触发真正的从头到尾的计算,这时listRDD.cache()方法才会被真正的执行,把listRDD持久化到内存中;当程序执行到第10行代码时,进行第二次的行动算子操作,但不触发从头到尾的计算,只需使用已经持久化的listRDD来进行计算。