Sparkソースコードリーディング(2) - confBroadcast(01)
引き続きtextFile
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
textFileではhadoopFile関数の返り値を使用しているため、この関数を調査
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
とあるようにhadoopFileでは冒頭にbroadcast関数をCall
def broadcast[T: ClassTag](value: T): Broadcast[T] = { val bc = env.broadcastManager.newBroadcast[T](value, isLocal) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
これがbroadcast関数
1行目を追いかけるとHttpBroadCastFactory内でHttpBraodCastインスタンスを返している
2行目のcleanerだが
private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } }
となっている
ここでspark.cleaner.referenceTrackingというSparkConfの値を取得しているのだが
このKeyがマニュアルに載っていない。。。なんだこれ?
まあ設定していなかったらtrueなのでSome(ContextCleaner)を返すのであろう
今日はここまで
次回はcleaner.foreacheを紐解こう