とある技術者の研鑽結果

SIerのお仕事でたまった鬱憤を最新テクノロジー勉強で晴らすためのブログです

OpenCV - SIFTを使った特徴量算出及びマッチング

Web+DB Express Vol.83を購入したところ特集として画像認識が

いやあWeb技術者もComputer Visionが必要な時代かあ。。。と思い読み進めると、Javaでのコーディング例も載っていてかなり実用的でいい感じ

しかしJavaよりかはPythonでお手軽にコーディングしたいよね!ってことで、掲載のと同じSIFTを使った特徴量算出及びマッチング、画像表示をPythonで書いてみました

import cv2

# 画像を読み込み白黒化する
im = cv2.imread('test.jpg')
im2 = cv2.imread('test2.jpg')
gray= cv2.cvtColor(im,cv2.COLOR_BGR2GRAY)
gray2= cv2.cvtColor(im2,cv2.COLOR_BGR2GRAY)
	
# SIFT特徴量算出アルゴリズムモジュールを呼び出す		
sift = cv2.SIFT()

# 特徴量を算出する
kp ,des = sift.detectAndCompute(gray, None)
kp2 ,des2 = sift.detectAndCompute(gray2, None)

# 類似点を算出する
matcher = cv2.DescriptorMatcher_create("FlannBased")
matches = matcher.match(des,des2)

# 類似点を表示する
img3 = cv2.drawMatches(gray,kp,gray2,kp2,matches[:10])

簡単、簡単♪と実行してみてると

AttributeError: 'module' object has no attribute 'drawMatches'

は????
え、python経由でdrawMatchesって使えないの???

ネットの海をあさった結果、なんか使えなさそうな雰囲気(詳しい人がいたら教えてほしいです…)
というわけでStackOverflowを参考にdrawMatchesを自作することに

import numpy as np
import cv2

def drawMatches(img1, kp1, img2, kp2, matches):
 
    rows1 = img1.shape[0]
    cols1 = img1.shape[1]
    rows2 = img2.shape[0]
    cols2 = img2.shape[1]

    out = np.zeros((max([rows1,rows2]),cols1+cols2,3), dtype='uint8')

    out[:rows1,:cols1,:] = np.dstack([img1, img1, img1])

    out[:rows2,cols1:cols1+cols2,:] = np.dstack([img2, img2, img2])

    for mat in matches:

        img1_idx = mat.queryIdx
        img2_idx = mat.trainIdx

        (x1,y1) = kp1[img1_idx].pt
        (x2,y2) = kp2[img2_idx].pt

        cv2.circle(out, (int(x1),int(y1)), 4, (255, 0, 0), 1)   
        cv2.circle(out, (int(x2)+cols1,int(y2)), 4, (255, 0, 0), 1)

        cv2.line(out, (int(x1),int(y1)), (int(x2)+cols1,int(y2)), (255, 0, 0), 1)

    cv2.imshow('Matched Features', out)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

# 画像を読み込み白黒化する
im = cv2.imread('test.jpg')
im2 = cv2.imread('test2.jpg')
gray= cv2.cvtColor(im,cv2.COLOR_BGR2GRAY)
gray2= cv2.cvtColor(im2,cv2.COLOR_BGR2GRAY)
	
# SIFT特徴量算出アルゴリズムモジュールを呼び出す		
sift = cv2.SIFT()

# 特徴量を算出する
kp ,des = sift.detectAndCompute(gray, None)
kp2 ,des2 = sift.detectAndCompute(gray2, None)

# 類似点を算出する
matcher = cv2.DescriptorMatcher_create("FlannBased")
matches = matcher.match(des,des2)

# 類似点を表示する
img3 = drawMatches(gray,kp,gray2,kp2,matches[:10])

これで動作は問題ないみたい
drawKeypointsはあるのになあ

Sparkソースコードリーディング(5) - spark.textFile関数最後

textFile関数の最後
hadoopFIle関数の最後はHadoopRDDをnewして終了

    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)

ただnewしているだけですが興味深い点を2点

まずはinputFormatClass
これ中身は classOf[TextInputFormat]となっています
Hadoopをやっている方にはおなじみのTextInputFormatさんですね
HadoopRDDなのでHadoopからTextをとるために指定していると

次にkeyClass,ValueClass
これはSparkというよりはScalaの文法なのですが

class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)

という形でコンストラクタに渡されています
これを見るにkeyClassはMapのKeyの型に、valueClassはMapのValueの型に指定されるわけです
外部からHadoopRDDが格納する型を指定できると
Scala、面白い。。。

で、ここまででようやくtextFileに戻ってきました

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

最後詰めです

後はhadoopFile関数が返したHadoopRDDに対して、mapを噛ましてRDD[String]を返している訳です
map(pair => pair._2.toString) という書き方がScala特有ですね
例えば

scala> val mm = Map(1->"a", 2->"b")
scala> mm.foreach(pair => println(pair._2).toString())
a
b

といった形で_1でKey, _2でValueが取れる訳です

HadoopのTextはKeyに行No.(だっけか?)、Valueに実際のテキスト値が入っているので、ここでテキスト値をRDD[String]につめ直しています


そんなこんなでHadoopのClassを使いつつHDFSのテキストファイルを取得しているのがtextFIle関数という訳でした

OpenCV - HoG特徴量の算出

とある野望に向けてPythonを使い画像解析
まずはHoG特徴量を算出したい
HoG特徴量が何かというのはググっていただければと思うが、とにかくこれを算出することによって画像の類似度をはかりたいと考えている

まずmacOpenCVを導入っと

brew tap homebrew/science
brew install opencv
sudo ln -s /usr/local/Cellar/opencv/2.4.9/lib/python2.7/site-packages/* /Library/Python/2.7/site-packages/

窓さんと比較してなんと簡単なことか。。。

で画像のHoG特徴量を算出します
別に難しいことは何もなく

hog=cv2.HOGDescriptor()
img=cv2.imread('test.jpg')
res=hog.compute(img)

これだけ
printすると

[[ 0.19031134]
 [ 0.05503516]
 [ 0.01024156]
 ..., 
 [ 0.16606231]
 [ 0.28762984]
 [ 0.28762984]]

ほうほういい感じ

Sparkソースコードリーディング(4) - hadoop_FileInputFormat

戻ってhadoopFile関数

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)

次はsetInputPathsFunc
ここはScalaっぽい、というか関数型っぽい書き方で関数定義をしているよう

FileInputFormat自体はHadoopメソッド(これ)

Sets the given comma separated paths as the list of inputs for the map-reduce job.

とあるようにカンマ区切りでHDFSのパスを受け取る
HDFSのファイルを取得する際に必要なものですね

textFileで指定したパスがここでhadoopに渡されるという流れでしょう

さて次は本丸のRDDだ!

Sparkソースコードリーディング(3) - broadcast

broadcast関数続き

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

次はcleaner
前回調査した通りcleanerはOption型のContextCleanerを返す
よってforeachはOptionのforeachで、値があれば処理するあれですね

で_(アンダースコア)はcleanerの中身なので、ContextCleanerであり、registerBroadcastForCleanupもそのメソッドですね

何をしているかというと

 def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
    registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
  }

  /** Register an object for cleanup. */
  private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
    referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
  }

とあるようにbroadcastをBufferに格納していると

ContextCleanerの役割が

/**
 * An asynchronous cleaner for RDD, shuffle, and broadcast state.
 *
 * This maintains a weak reference for each RDD, ShuffleDependency, and Broadcast of interest,
 * to be processed when the associated object goes out of scope of the application. Actual
 * cleanup is performed in a separate daemon thread.
 */

とあるようにこれから実行しようとしてる命令の初期化処理を実施するため、Queueに突っ込んでいるイメージといって過言ではないかなあ?

まとめるとbroadcast関数は

1. broadcastするためのspark.HttpBroadcastを取得する
2. ただし、その前段処理としてcleanupつまり配信周りを初期化する

という役割ということでおおむね間違ってなさそう
時間をみてcleanupの具体的な中身を追うとしよう
まずはtextFileの続きを追わねば

Sparkソースコードリーディング(2) - confBroadcast(01)

引き続きtextFile

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

textFileではhadoopFile関数の返り値を使用しているため、この関数を調査

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))

とあるようにhadoopFileでは冒頭にbroadcast関数をCall

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

これがbroadcast関数
1行目を追いかけるとHttpBroadCastFactory内でHttpBraodCastインスタンスを返している
2行目のcleanerだが

  private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  }

となっている
ここでspark.cleaner.referenceTrackingというSparkConfの値を取得しているのだが
このKeyがマニュアルに載っていない。。。なんだこれ?
まあ設定していなかったらtrueなのでSome(ContextCleaner)を返すのであろう

今日はここまで
次回はcleaner.foreacheを紐解こう

Sparkソースコードリーディング(1) - spark.textFile関数の引数

Apache Sparkソースコードリーディングを記録

まずは公式サイトに記載されているWordCountの第一文から追っていきましょう

file = spark.textFile("hdfs://...")

このsparkというのはSparkContextクラスのインスタンスなので該当メソッドを調査

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

ふむふむpathはhdfsのパスを受け取り、minPartitionsはデフォルトを使用すると
デフォルトって何という話だが

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

とあり、defaultParallelismと2の小さい方となる
defaultParallelismはTaskSchedulerImplクラス内の関数になるのだが、並列数を定義するものである
local実行なら1、それ以上の並列なら並列数分だけの数を返す

要はdefaultMinPartitionsはlocal実行なら1だし、それ以外なら2が返ってくるものと思ってよさそう