Spark源代码分析——谈RDD和依赖关系
我们知道RDD在Spark中是一个特别重要的概念。可以说,Spark的所有逻辑都需要依赖RDD。在本文中,我们简要讨论了Spark中的RDD。Spark中RDD的定义如下:
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { def this(@transient oneParent: RDD[_]) = this(oneParent.context, List(new OneToOneDependency(oneParent))) }
每个RDD包含以下五个属性:
- 此 RDD 的分区列表
- 每个数据文件的计算函数
- 对其他 RDD S 的依赖
- 区域选择器(可选)
- 每个数据文件的位置信息(可选)
为了更好地理解,这里我们使用HDFS上常见的HDFS实现:Hadoop RDD的实现。
我们先来看看Hadoop RDD是如何获取分区信息的:
override def getPartitions: Array[Partition] = { val jobConf = getJobConf() SparkHadoopUtil.get.addCredentials(jobConf) try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { case e: InvalidInputException if ignoreMissingFiles => Array.empty[Partition] } }
如你所见,hadoop RDD获取的是对应数据的底层文件信息,即hadoop中的块信息,然后一个块文件就是一个分区。这里,HadoopRDD对应的分区信息被封装到hadoop分区中:
trait Partition extends Serializable { def index: Int override def hashCode(): Int = index override def equals(other: Any): Boolean = super.equals(other) } private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit) extends Partition { val inputSplit = new SerializableWritable[InputSplit](s) override def hashCode(): Int = 31 * (31 + rddId) + index override def equals(other: Any): Boolean = super.equals(other) def getPipeEnvVars(): Map[String, String] = { val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) { val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit] // since it's not removed yet Map("map_input_file" -> is.getPath().toString(), "mapreduce_map_input_file" -> is.getPath().toString()) } else { Map() } envVars } }
它主要包含几个信息:
- RDD 的编号
- 分区的序列号
- 与分区对应的文件块
对于依赖项,由于此处未设计 Shuffle,因此不存在依赖项。构造实例化时,传入的依赖项列表为空。 如果我们在RDD中进行相关计算,例如reparation,则该方法实现如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { val distributePartition = (index: Int, items: Iterator[T]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => position = position + 1 (position, t) } } : Iterator[(Int, T)] new CoalescedRDD( new ShuffledRDD[Int, T, T]( mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
你可以看到此时返回了一个合并的RDD,其中有一个shuffled RDD,然后shuffled的RDD被套上一个MapPartitionsRDD。让我们先来看看 MapPartitionsRDD:
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false, isOrderSensitive: Boolean = false): RDD[U] = withScope { new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), preservesPartitioning = preservesPartitioning, isOrderSensitive = isOrderSensitive) } private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, isOrderSensitive: Boolean = false) extends RDD[U](prev) { .... }
请注意,如果父RDD以这种方式构造传入的RDD[U](prev):
def this(@transient oneParent: RDD[_]) = this(oneParent.context, List(new OneToOneDependency(oneParent)))
依赖关系是一对一的依赖关系。这里,mappartitions RDD 的父 RDD 就是当前的 RDD,也就是我们分析的 Hadoop RDD
我们来看看以上五个属性
分区
其分区获取功能:
override def getPartitions: Array[Partition] = firstParent[T].partitions protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
可以看到,当前依赖列表中第一个依赖关系对应的分区,即Hadoop RDD的分区信息,就是在这里获取的。
计算功能
MapPartitionsRDD的计算函数通过构造传入
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) // ------------------------------------------ val distributePartition = (index: Int, items: Iterator[T]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => position = position + 1 (position, t) } } : Iterator[(Int, T)]
可以看到,这里进行重新分区时,根据父RDD的每个分区,根据哈希随机分解分区序列号,返回每个数据和新的分区ID。 此外,firstParent[T].iterator(split, context) 用于读取父 RDD 的数据。我们可以看看这个实现:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
我们可以看到,这里我们回到任务执行期间读取RDD数据。这里我们实际上是Hadoop RDD。读取的数据是一个数据块一个,然后处理块数据。每个区块中的数据会根据哈希分区分散到不同的分区中。
看看shuffled的RDD。它的父RDD是上面的MapPartitionsRDD:
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, C)](prev.context, Nil) { ... }
分区列表返回如下:
override def getPartitions: Array[Partition] = { Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i)) }
这里的分区返回一个shuffled的rddpartition,其中包含一个分区序列号(根据传入的HashPartitioner 需要重新分区的分区数量)返回的依赖如下:
override def getDependencies: Seq[Dependency[_]] = { val serializer = userSpecifiedSerializer.getOrElse { val serializerManager = SparkEnv.get.serializerManager if (mapSideCombine) { serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]]) } else { serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]]) } } List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) }
可以看出,这里返回的是一个 ShuffleDependency。当我们之前研究阶段划分时,每当我们遇到随机依赖时,就会划分一个新的阶段。结合我们之前的分析,阶段将在这里划分。如果需要后续计算,将生成一个 ShuffleMapTask 以在 Map 结束时写入数据。 当读取数据时,将调用“ShuffleRDDpute”方法:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
以下是我们分析任务执行过程。每个执行节点将获取应由其自己的节点处理的数据。这里是获取对应Hadoop RDD对应的块数据文件,然后根据分区返回分区和每个数据对应的数据。然后每个执行节点会根据生成的数据(重新分区后)得到同一个分区的数据,把它们放在一起,写入本地临时文件。
我们正在查看最外层的合并RDD:
private[spark] class CoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], maxPartitions: Int, partitionCoalescer: Option[PartitionCoalescer] = None) extends RDD[T](prev.context, Nil) { ... }
通过前面的分析,我们知道如果没有其他后续处理,会生成一个 ResultTask 对应的 ResultStage 来执行用户传入的方法。 以下是合并的依赖关系:
override def getDependencies: Seq[Dependency[_]] = { Seq(new NarrowDependency(prev) { def getParents(id: Int): Seq[Int] = partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices }) }
可以看出,返回了一个窄依赖关系。如果有其他后续处理,则会根据RDD生成新的RDD,并且只有在遇到ShuffleDependency 时才划分阶段。 这里的计算函数计算实现如下:
override def compute(partition: Partition, context: TaskContext): Iterator[T] = { partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition => firstParent[T].iterator(parentPartition, context) } }
你可以看到这里调用了结构,并且传入了 ShuffledRDD 来读取数据。至于如何实现这一点,你可以看到前面的源代码分析。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2014-09-18,如有侵权请联系 cloudcommunity@tencent 删除数据sparkintrdd函数