Spark如何防止内存溢出

问题分析

考官主要考核你是否真的有spark的使用经验,以及使用spark的过程中如何解决一些常见的内存溢出问题,这对于以后工作中解决问题非常重要。

核心答案讲解

Spark使用过程中会有多种内存溢出的情况,即包括driver端的内存溢出,map对象过多内存溢出,数据不平衡内存溢出,shuffle后内存溢出以及standalone资源分配不均匀导致的内存溢出。

  • driver端的内存溢出

可以增大driver的内存参数:Spark.driver.memory (default 1g)
这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。

  • map过程产生大量对象导致内存溢出

这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。
例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。

  • 数据不平衡导致内存溢出

数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。

  • shuffle后内存溢出

shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。
在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过Spark.default.parallelism控制(在Spark-sql中用Spark.sql.shuffle.partitions) , Spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用Spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。

  • standalone模式下资源分配不均匀导致内存溢出

在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 这两个参数,但是没有配置--executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置--executor-cores或者Spark.executor.cores参数,确保Executor资源分配均匀。

使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()。rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算。而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

问题扩展

使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()。
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算。
而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗些IO读写时间。

项目应用

在工作中,我们尽量要避免出现内存溢出的情况,而不是等待出现问题后再去解决,否则就会影响公司的业务绩效。因此,我们提到的配置项等解决方法,应该在开发和部署上线时,提前配置,防患于未然。

点击此处
隐藏目录