とある技術者の研鑽結果

SIerのお仕事でたまった鬱憤を最新テクノロジー勉強で晴らすためのブログです

Sparkソースコードリーディング(5) - spark.textFile関数最後

textFile関数の最後
hadoopFIle関数の最後はHadoopRDDをnewして終了

    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)

ただnewしているだけですが興味深い点を2点

まずはinputFormatClass
これ中身は classOf[TextInputFormat]となっています
Hadoopをやっている方にはおなじみのTextInputFormatさんですね
HadoopRDDなのでHadoopからTextをとるために指定していると

次にkeyClass,ValueClass
これはSparkというよりはScalaの文法なのですが

class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)

という形でコンストラクタに渡されています
これを見るにkeyClassはMapのKeyの型に、valueClassはMapのValueの型に指定されるわけです
外部からHadoopRDDが格納する型を指定できると
Scala、面白い。。。

で、ここまででようやくtextFileに戻ってきました

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

最後詰めです

後はhadoopFile関数が返したHadoopRDDに対して、mapを噛ましてRDD[String]を返している訳です
map(pair => pair._2.toString) という書き方がScala特有ですね
例えば

scala> val mm = Map(1->"a", 2->"b")
scala> mm.foreach(pair => println(pair._2).toString())
a
b

といった形で_1でKey, _2でValueが取れる訳です

HadoopのTextはKeyに行No.(だっけか?)、Valueに実際のテキスト値が入っているので、ここでテキスト値をRDD[String]につめ直しています


そんなこんなでHadoopのClassを使いつつHDFSのテキストファイルを取得しているのがtextFIle関数という訳でした