Sparkソースコードリーディング(5) - spark.textFile関数最後
textFile関数の最後
hadoopFIle関数の最後はHadoopRDDをnewして終了
new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path)
ただnewしているだけですが興味深い点を2点
まずはinputFormatClass
これ中身は classOf[TextInputFormat]となっています
Hadoopをやっている方にはおなじみのTextInputFormatさんですね
HadoopRDDなのでHadoopからTextをとるために指定していると
次にkeyClass,ValueClass
これはSparkというよりはScalaの文法なのですが
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
という形でコンストラクタに渡されています
これを見るにkeyClassはMapのKeyの型に、valueClassはMapのValueの型に指定されるわけです
外部からHadoopRDDが格納する型を指定できると
Scala、面白い。。。
で、ここまででようやくtextFileに戻ってきました
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
最後詰めです
後はhadoopFile関数が返したHadoopRDDに対して、mapを噛ましてRDD[String]を返している訳です
map(pair => pair._2.toString) という書き方がScala特有ですね
例えば
scala> val mm = Map(1->"a", 2->"b") scala> mm.foreach(pair => println(pair._2).toString()) a b
といった形で_1でKey, _2でValueが取れる訳です
HadoopのTextはKeyに行No.(だっけか?)、Valueに実際のテキスト値が入っているので、ここでテキスト値をRDD[String]につめ直しています
そんなこんなでHadoopのClassを使いつつHDFSのテキストファイルを取得しているのがtextFIle関数という訳でした