狠狠撸

狠狠撸Share a Scribd company logo
今日からはじめる
  GPars
      2011-06-17
第16回 G*ワークショップ

 吉田 健太郎 (@fumokmm)
自己紹介
名前:吉田 健太郎(ふも)
仕事:プログラマ!
Twitter:@fumokmm
はてな:id:fumokmm
ブログ:No Programming, No Life
    http://d.hatena.ne.jp/fumokmm/
活動:
●   JGGUG運営委員
●   Grailsドキュメント会参加
●   G*Magazine vol.2「もし新人女子Javaプログラマが
    『Groovyイン?アクション』を読んだら」
GParsとはなにか




 Groovy Parallel Systems
http://gpars.codehaus.org/
   読み方は「じーぱーず」
GParsとはなにか
GParsはGroovyベースの並列処理ライブラリ群
他の言語の並列処理で使われている技術のいいとこど
りをしていったらすごいことになってしまったライブ
ラリ。
ほとんどJavaで書かれてるから、速度は問題ない
Javaからも使える。(若干面倒だけど)
Groovy v1.8から本体にバンドルされるようになった
のはGPars v0.11 で、最新版はv0.12。
v1.8な人はインポートするだけで使える。
そうじゃない人は、@Grabればいい。
@Grab('org.codehaus.gpars:gpars:0.12')
本日のメニュー

非同期コレクション
?


アクター
エージェント
データフロー
       まとめ
非同期コレクション
並列コレクションとは
コレクションに対する並列処理を簡単に提供。

並列コレクションはJavaの並列処理ライブラリ
のラッパーを提供。
●
    並列コレクションは JSR 166
●
    Fork/Join は JSR 166y (Java 7)
●
    Map/Filter/Reduce
並列コレクションなハロワ
import static groovyx.gpars.GParsPool.*
def numbers = [1, 2, 3, 4, 5, 6]
def squares = [1, 4, 9, 16, 25, 36]
withPool {
  assert squares
    == numbers.collectParallel { it * it }
}

                             parallel_collection01.groovy


まず、並列処理したい部分をwithPoolで囲む。

ここの例では parallel に collect している。
parallelにeachしてみる
import static groovyx.gpars.GParsPool.*
def numbers = 0..9
withPool {
  numbers.eachParallel {
    print it
  }
}
                             parallel_collection02.groovy


 出力例(毎回異なる):

 0213756489
 Result: 0..9
parallelメソッド対応表
通常のメソッドと withPool内で
使えるようになる parallel なメソッドの対応表
withPoolについて
import static groovyx.gpars.GParsPool.*
withPool(10) {
  // 10スレッド分スレッドが入るプールを確保
  // この中で自由にスレッドを泳がせてね
}




withPoolはデフォルトではコア数+1のスレッドを使う。
第一引数でスレッド数を指定可能。
例) Core 2 な場合、デフォルトではスレッド数は3。
makeTransparent
import static groovyx.gpars.GParsPool.*
withPool {
  def numbers =
       [1, 2, 3, 4, 5, 6].makeTransparent()
  def squares = [1, 4, 9]
  assert squares ==
  numbers.collect{ it * it }
         .grep{ it < 10 }
}
                             parallel_collection03.groovy

.makeTransparent() しておくと、いちいち ?Parallel
しなくて済むからちょっとだけシンプルに書ける。

※注意※ v0.12から名前が .makeConcurrent() に変更に
なっています。
map/filter/reduce
import static groovyx.gpars.GParsPool.*
withPool {
  assert 30 ==
    [1, 2, 3, 4,   5, 6, 7, 8, 9, 10].parallel
    .filter { it   <= 5 }
    .map    { it   * 2 }
    .reduce { a,   b -> a + b }
}
                               parallel_collection04.groovy



コレクションに .parallel を付けると、
関数型っぽくmap/filter/reduceできるようになります。
尘补辫/蹿颈濒迟别谤/谤别诲耻肠别メソッド対応表
例:文字数カウント
import static groovyx.gpars.GParsPool.*
def words = "This is just a plain text to
count words in"
print count(words)
def count(arg) {
  withPool {
    return arg.parallel
      .map{[it, 1]}
      .groupBy{it[0]}.getParallel()
      .map {it.value=it.value.size();it}
      .sort{-it.value}
      .collection
} }
                             parallel_collection05.groovy

結果はラップされているので、最後に .collection を付ける
必要がある。.groupByなどたまに対応してないメソッドが
あるので、.getParallel() しなおしてあげる必要がある。
例:スリープソート
import static groovyx.gpars.GParsPool.*
import java.util.concurrent.*
def numbers = [8, 1, 4, 9, 3, 6]
def latch = new
CountDownLatch(numbers.size())

// numbersのサイズ分、スレッド確保
withPool(numbers.size() ?: 1){
  numbers.eachParallel {
    latch.countDown()
    latch.await() // 全部の準備が終わるまで待つ
    Thread.sleep((it as int) * 10)
    println it
  }
}
                                                 sleepSortSample.groovy
        cf. http://d.hatena.ne.jp/fumokmm/20110611/1307811572
例:フィボナッチ数 (Fork/Join)
import static groovyx.gpars.GParsPool.*
def fibo(num) {
  withPool {
    runForkJoin(num) { n ->
      switch (n) {
        case 0..1: return n
        default:
          forkOffChild(n - 1)
          forkOffChild(n - 2)
          [*childrenResults].flatten().sum()
} } } }
assert fibo(10) == 55
                               fibonacciSample.groovy

JSR 166yのfork/join機能をGParsのラッパーから使う例。
M/F/R vs Fork/Join
本日のメニュー

非同期コレクション
アクター
?



エージェント
データフロー
       まとめ
アクター
アクターとは
          メッセージ




             返信




         基本ステップ
アクターはそれぞれ、独立したスレッド(世界)で動く。

自分とは別のアクターとやり取りするには、メッセージを送る。
メッセージを受け取ったアクターはメッセージに返信することが
できる。
アクターなハロワ
import static groovyx.gpars.actor.Actors.*
def console = actor {
  loop {
    react { msg ->
      println msg
    }
  }
}
console.send('Hello World!')
                                   actor01.groovy


アクターを生成するには、actor { … } DSLを使う。
loopはstop()されるまで、繰り返される。
最後に、consoleアクターに 'Hello World!' という String
をメッセージとして送信している。
.send の別記法
console.send('Hello World!')
console << 'Hello World!'    // << でもOK
console.call('Hello World!') // .callでもOK
console 'Hello World!'
                   // .call=クロージャ呼び出し



メッセージ送信のためのメソッド .send には別記法が
用意されている。(種類は上記コード参照)

ErlangやScalaなどで使う ! は Groovyではオーバーロード
できないので違う記法を採用している。
メインスレッドがアクターに送信
import static groovyx.gpars.actor.Actors.*
def console = actor {
  loop {
    react { msg ->
      println msg
    }
  }
}
// 出力順はばらばら
console << 'Hello World!'
println 'finish.'
                                   actor02.groovy
メインスレッドがアクターに送信
import static groovyx.gpars.actor.Actors.*
def console = actor {
  loop {
    react { msg ->
      println msg
    }
  }
}
console << 'Hello World!'

console.stop() // <- consoleのアクターをストップ
console.join() // <- メインスレッドで、console
         //          アクターが終わるまで待つ
println 'finish.'
                                   actor03.groovy
高度なloop
def whileTrue = {->
  // 繰り返し条件を返すクロージャ(真の間繰り返す)
}
def printResult = {->
  // ループ終了時に呼び出されるクロージャ
}
loop(whileTrue, printResult) {
  // アクターに行わせる処理
}
                         actor04.groovy


loopの第一引数に繰り返し条件(真の間繰り返し)を返す
クロージャやint(繰り返し回数)を指定することでループ
回数を制御できる。
第二引数にクロージャを渡すと、loop終了時に実行される
(フック処理)
reactor (リアクター)
import static groovyx.gpars.actor.Actors.*
def console = actor {
  loop {
    react { msg ->
      println msg
    }
  }
}
//   同じ
def console2 = reactor { msg   println msg }
                                   actor05.groovy

loop-react の組み合わせはよく使うので
用意された便利DSL。リアクション専用。
sender(送り主)
react { tweet ->
  if (isSpam(tweet)) {
    ignoreTweetsFrom sender // 通報
  }
  sender.send '二度と送って来ないでね!'
}



senderはactorのクロージャ内部で、送信元を特定する
のに使える。
reply(返信)
import static groovyx.gpars.actor.Actors.*
def replyingActor = reactor { msg ->
  println "受信: $msg"
  reply "<<$msg>>"
}
def reply =
  replyingActor.sendAndWait('メッセージ 1')
assert reply == '<<メッセージ1>>'
                                   actor06.groovy

ここでは、msgを受け取って、<< と >> で囲ったものを
返却している。

.sendAndWait はメッセージを送信した後、アクターから
の返信を待つ。
メッセージ振り分け(switch-case)
loop {
  react { text ->
    switch(text) {
       case String :
         reply << "あなたの送ったのはString"
         break
       case Integer:
         reply << "あなたの送ったのはInteger"
         break
    }
  }
}

一番単純な振り分け。受け取ったメッセージを
switch-case式を用いて振り分けている。
Groovyのswitch-caseはそれなりに強力なのでこれでも
十分強力かもしれないが、もっといい方法がある。
メッセージ振り分け(when)
import static groovyx.gpars.actor.Actors.*
def handler = messageHandler {
  when { String message -> reply '文字列' }
  when { Number message -> reply '数字' }
}

assert '文字列' ==
           handler.sendAndWait('こんにちは')
assert '数字' == handler.sendAndWait(123)
                                   actor07.groovy


MessageHandler { … } DSLを使うと when に渡すクロージャ
のバリエーションでメッセージを振り分けることができる。
比较:厂肠补濒补アクターの
        メッセージ振り分け
val badActor = actor {
  var done = false
  while (! done) {
    receive {
      case NegotiateNewContract =>
        // 処理
      case Speak(line) =>
        // 処理
      case _ =>
        // 全部マッチしなかった時の処理
    }
  }
}
badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?"
メッセージについて
GParsのメッセージはイミュータブルにする必
要はない(強制されてはいない)が、送信した
あとはむやみに触るべきではない。
他の言語(Erlang)とかはそもそもすべてがイ
ミュータブルなので、ここらへんは心配しなく
てよい。
Scalaはcase classにしたりして、パターン
マッチでメッセージを受け取っている。
(1ページ前の比較を参照)
本日のメニュー

非同期コレクション
アクター
エージェント
?




データフロー
       まとめ
エージェント
エージェントとは
並列処理環境においてミュータブルなデータを
安全に取り扱うのは難しい。

そこでエージェントを使い、データの更新を
ラップする。

この機能はClojure由来。
エージェントなハロワ
import groovyx.gpars.agent.Agent
def guard = new Agent<String>()
guard { updateValue('GPars') }
guard { updateValue(it + ' is groovy!') }
assert 'GPars is groovy!' == guard.val
                                   agent01.groovy



エージェントが内包する値の更新は、クロージャを通して
行うことになる。
結果値は .val で取得できる。
リストに追加
import groovyx.gpars.agent.Agent
def members = new Agent(['Me'])
members.send {it.add 'A'}
def t1 = Thread.start {
  members.send {it.add 'B' } }
def t2 = Thread.start {
  members << { it.add 'C' }
  members { it.add 'D' }    }
[t1, t2]*.join()
println members.val
members.valAsync {println "現在のメンバ: $it" }
members.await()
                                agent02.groovy

複数スレッドから同時に更新をかけてもリストは壊れない。
エージェントが守っているから。
リスナーとバリデータを追加
import groovyx.gpars.agent.Agent
def counter = new Agent()
counter.addListener{ oldVal, newVal ->
  println "$oldVal -> $newVal"
}
counter.addValidator{ oldVal, newVal ->
  if (newVal < oldVal) throw
    new Exception('新しい値が小さいのでエラー')
}
      .
      .
      .
assert counter.hasErrors()
assert 1 == counter.errors.size()
                                agent03.groovy

値の更新時に通知したり、チェックしたりするクロージャを
追加することができる。エラーがあったかは後でチェック。
本日のメニュー

非同期コレクション
アクター
エージェント
データフロー
?




       まとめ
データフロー
データフローとは
タスクという単位に処理を切り分けて記述して
ゆく。

タスクの順番的な依存関係はデータフローがい
い感じに解決してくれる。
データフローなハロワ
import groovyx.gpars.dataflow.DataFlows
import static groovyx.gpars.dataflow.DataFlow.*
def flow = new DataFlows()
task { flow.result = flow.x + flow.y } //
task { flow.x = 10 } //
task { flow.y = 5 } //
assert 15 == flow.result //
                                     dataflow01.groovy


①、②、③の順番で代入が実行される。
②の flow.result は ①の flow.x, flow.y が代入されるまで待機
する
③の flow.result は ①の flow.result が代入されるまで待機する
デッドロックにご用心
import groovyx.gpars.dataflow.DataFlows
import static groovyx.gpars.dataflow.DataFlow.*

def flow = new DataFlows()

task { flow.x = flow.y }
task { flow.y = flow.x } // デッドロック!

println flow.x // 帰ってこない
                                     dataflow02.groovy


デッドロックが発生する例。

flow.x のようにしなければ、帰ってくる。
本日のメニュー

非同期コレクション
アクター
エージェント
データフロー
       まとめ
      ?
まとめ
GParsでだいたいどんなことができるかわかっ
て頂けたかと思います。
エージェントとデータフローの内容が薄いのは
まとめきれてないだけです???(反省)本当
はもっと多機能なので、ご興味が湧かれた方は
ぜひ調べてみて下さい。
GParsで日常の小さな部分からコツコツと簡単
に並列処理化をはじめてみませんか?
まとめ
GParsは
クロージャ、DSL、MOPなどの
Groovy機能を駆使して
JVMの強力な並列処理機能を
シンプルに使いこなす
Groovy流のやり方だった
参考
●
    Groovy in Action, Second Edition
    http://www.manning.com/koenig2/

●
    GPars Javadoc
    http://gpars.org/0.12/javadoc/

●
    GPars ユーザガイド&リファレンス
    http://gpars.org/0.12/guide/index.html
告知:GParsドキュメント翻訳中
 現在GParsユーザガイド(v0.12)の翻訳作業を
 Google Translator Toolkit上で行っています。
      http://goo.gl/d1LHM


 参加者:ふも(@fumokmm)
     杉浦さん(@touchez_du_bois)


 協力者大募集中!
   @fumokmm までご連絡下さい。
ご清聴
ありがとうございました

More Related Content

今日からはじめる骋笔补谤蝉