とある技術者の研鑽結果

SIerのお仕事でたまった鬱憤を最新テクノロジー勉強で晴らすためのブログです

Sparkソースコードリーディング(2) - confBroadcast(01)

引き続きtextFile

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

textFileではhadoopFile関数の返り値を使用しているため、この関数を調査

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))

とあるようにhadoopFileでは冒頭にbroadcast関数をCall

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

これがbroadcast関数
1行目を追いかけるとHttpBroadCastFactory内でHttpBraodCastインスタンスを返している
2行目のcleanerだが

  private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  }

となっている
ここでspark.cleaner.referenceTrackingというSparkConfの値を取得しているのだが
このKeyがマニュアルに載っていない。。。なんだこれ?
まあ設定していなかったらtrueなのでSome(ContextCleaner)を返すのであろう

今日はここまで
次回はcleaner.foreacheを紐解こう