SparkRDD论文总结

本篇文章是对SparkRDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。

RDD

Motivation

传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象——RDD。

设计思想及特点

ResilientDistributedDataset(RDD)是ApacheSpark中数据的核心抽象,它是一种只读的、分区的数据记录集合。

RDD的特点:

Lazyevaluation,只在需要的时候才进行计算

RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算

Resilient:借助RDDlineagegraph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance

那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:

valrdd=sc.parallelize(1to)

valresult=rdd.map(_+10)

.filter(_15)

.map(x=(x,1))

.reduceByKey(_+_)

.collect

并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。

RDD的表示

Spark中的RDD主要包含五部分信息:

partitions():partition集合

dependencies():当前RDD的dependency集合

iterator(split,context):对每个partition进行计算或读取操作的函数

partitioner():分区方式,如HashPartitioner和RangePartitioner

preferredLocations(split):访问某个partition最快的节点

所有的RDD都继承抽象类RDD。几种常见的操作:

sc#textFile:生成HadoopRDD,代表可以从HDFS中读取数据的RDD

sc#parallelize:生成ParallelCollectionRDD,代表从Scala集合中生成的RDD

map,flatMap,filter:生成MapPartitionsRDD,其partition与parentRDD一致,同时会对parentRDD中iterator函数返回的数据进行对应的操作(lazy)

union:生成UnionRDD或PartitionerAwareUnionRDD

reduceByKey,groupByKey:生成ShuffledRDD,需要进行shuffle操作

cogroup,join:生成CoGroupedRDD

Operations

Spark里面对RDD的操作分为两种:transformation和action。

transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作

action会依次执行计算操作并且得到结果

这些transformation和action在FP中应该是很常见的,如map,flatMap,filter,reduce,count,sum。

对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:

implicitdefrddToPairRDDFunctions[K,V](rdd:RDD[(K,V)])

(implicitkt:ClassTag[K],vt:ClassTag[V],ord:Ordering[K]=null):PairRDDFunctions[K,V]={

newPairRDDFunctions(rdd)

}

Dependency

上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineagegraph(依赖图)。Dependency大致分两种:narrowdependency和widedependency。

Narrowdependency(NarrowDependency):ParentRDD中的每个partition最多被childRDD中的一个partition使用,即一对一的关系。比如map,flatMap,filter等transformation都是narrowdependency

Widedependency(ShuffleDependency):ParentRDD中的每个partition会被childRDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是widedependency(不同的partitioner)

论文中的图例很直观地表示了RDD间的依赖关系:

这样划分dependency的原因:

Narrowdependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而widedependency必须要等所有的parentRDD的结果都准备好以后再执行计算

Narrowdependency失败以后,Spark只需要重新计算失败的parentRDD即可;而对于widedependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算

Shuffle

Spark中的shuffle操作与MapReduce中类似,在计算widedependency对应的RDD的时候(即ShuffleMapStage)会触发。

首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组maptask来将每个分区写入到临时文件中,然后下一个stage端(reducetask)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在mapside也可能在reduceside进行)。

Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。

Shufflewrite(maptask):SortShuffleWriter#write

Shuffleread(reducetask):ShuffleRDD#







































北京看白癜风哪间医院最权威
寻常性白癜风



转载请注明:http://www.beicanshijie.com/sxgs/577.html