Sparkソースコードリーディング(3) - broadcast
broadcast関数続き
def broadcast[T: ClassTag](value: T): Broadcast[T] = { val bc = env.broadcastManager.newBroadcast[T](value, isLocal) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
次はcleaner
前回調査した通りcleanerはOption型のContextCleanerを返す
よってforeachはOptionのforeachで、値があれば処理するあれですね
で_(アンダースコア)はcleanerの中身なので、ContextCleanerであり、registerBroadcastForCleanupもそのメソッドですね
何をしているかというと
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) { registerForCleanup(broadcast, CleanBroadcast(broadcast.id)) } /** Register an object for cleanup. */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) { referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) }
とあるようにbroadcastをBufferに格納していると
ContextCleanerの役割が
/** * An asynchronous cleaner for RDD, shuffle, and broadcast state. * * This maintains a weak reference for each RDD, ShuffleDependency, and Broadcast of interest, * to be processed when the associated object goes out of scope of the application. Actual * cleanup is performed in a separate daemon thread. */
とあるようにこれから実行しようとしてる命令の初期化処理を実施するため、Queueに突っ込んでいるイメージといって過言ではないかなあ?
まとめるとbroadcast関数は
1. broadcastするためのspark.HttpBroadcastを取得する
2. ただし、その前段処理としてcleanupつまり配信周りを初期化する
という役割ということでおおむね間違ってなさそう
時間をみてcleanupの具体的な中身を追うとしよう
まずはtextFileの続きを追わねば