spark的shuffle阶段和mr的shuffle阶段
(1)问题分析
对hadoop和spark的shullfe过程理解。
不管是mr的shuffle过程,还是spark的shuffle过程,答案都比较多,考验面试者总结能力。
Spark的shuffle和Mr的shuffle也是经典的对比问题。
(2)核心问题讲解
MapReduce shuffle
mr的shuffle 分为map的shuffle和reduce 的shuffle。
Map的Shuffle
数据存到hdfs中是以块进行存储的,每一个块对应一个分片,maptask就是从分片中获取数据的 。
在某个节点上启动了map Task,map Task读取是通过k-v来读取的,读取的数据会放到环形缓存区,这样做的目的是为了防止IO的访问次数,然后环形缓存区的内存达到一定的阀值的时候会把文件益写到磁盘,溢出的各种小文件会合并成一个大文件,这个合并的过程中会进行排序,这个排序叫做归并排序
map阶段会涉及到:
1)sort排序(默认按字典排序)
2)合并(combiner合并)
3)文件合并(merage 合并 总共有三种,默认是内存到磁盘)
4)压缩(设置压缩就会执行)
reduce Shuffle
归并排序完成后reduce端会拉取map端的数据,拉取的这个过程叫做copy过程,拉取的数据最终会合并成一个文件,GroupComparator(默认,这个我们也可以自定义)是专门对文件夹里面的key进行分组的。
然后就形成k-List(v1,v2,v3)的形式,然后reduce经过业务处理,最终输出到hdfs,可以设置是否进行压缩。
reduce阶段会涉及到:
1)sort排序
2)分组(将相同的key的value放到一个容器的过程)
3)merage文件合并
Spark shuffle
与MapReduce完全不一样的是,MapReduce它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。
但是Spark不需要,spark默认情况下,是不会对数据进行排序的。因此ShuffleMapTask每写入一点数据,ResultTask就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。spark这种机制的好处在于,速度比mapreduce快多了。
但是也有一个问题,mapreduce提供的reduce,是可以处理每个key对应的values的,很方便;但是spark中,由于这种实时拉取的机制,因此提供不了,直接处理key对应的values的算子,只能通过groupByKey,先shuffle,有一个MapPartitionsRDD,然后用map算子,来处理每个key对应的values。这样就没有mapreduce的计算模型那么方便。
(3)问题扩展
SparkSql调优
spark.hadoopRDD.ignoreEmptySplits
默认是false,如果是true,则会忽略那些空的splits,减小task的数量。
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task.
spark.sql.autoBroadcastJoinThreshold && spark.sql.broadcastTimeout
用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力,而broadcastTimeout是用于控制Broadcast的Future的超时时间,默认是300s,可根据需求进行调整。
spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
该参数是用于开启spark的自适应执行,这是spark比较老版本的自适应执行,后面的targetPostShuffleInputSize是用于控制之后的shuffle 阶段的平均输入数据大小,防止产生过多的task。
intel大数据团队开发的adaptive-execution相较于目前spark的ae更加实用,该特性也已经加入到社区3.0之后的roadMap中,令人期待。
spark.sql.parquet.mergeSchema
默认false。当设为true,parquet会聚合所有parquet文件的schema,否则是直接读取parquet summary文件,或者在没有parquet summary文件时候随机选择一个文件的schema作为最终的schema。
spark.sql.files.opencostInBytes
该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
1或者2,默认是1. MapReduce详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。