狠狠撸

狠狠撸Share a Scribd company logo
入门肠辞谤别.补蝉测苍肠
core.async勉強会
2015/4/19@HaLake
@athos0220
アジェンダ
core.asyncとは
コアコンセプト
基本的な础笔滨の使い方
ちょっと进んだトピック
肠辞谤别.补蝉测苍肠とは?
肠辞谤别.补蝉测苍肠とは?
非同期プログラミング用ライブラリ
CSPライクなチャネル通信をベースとしている
Go言語のgoroutineにインスパイアされたgoブ
ロックを提供
コアコンセプト
チャネル
並行に動くタスク間で値を受け渡す通信路
producer channel consumer
チャネル
並行に動くタスク間で値を受け渡す通信路
producer channel consumer
チャネル
並行に動くタスク間で値を受け渡す通信路
producer channel consumer
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
バッファがいっぱいだったら?
チャネル
並行に動くタスク間で値を受け渡す通信路
producer consumer
実際にはバッファを持ったキュー
バッファがいっぱいだったら? バッファが空だったら?
協調スレッド(IOCスレッド)
以下のような場合に制御が別のスレッドに切り替わる
値を受信しようとしたときにバッファが空
値を送信しようとしたときにバッファがいっぱい
IOC = Inversion of Control
(go
(while true
(let [v (<! ch)]
(println v))))
(go
(loop [i 0]
(>! ch i)
(recur (inc i))))
協調スレッド(IOCスレッド)
以下のような場合に制御が別のスレッドに切り替わる
値を受信しようとしたときにバッファが空
値を送信しようとしたときにバッファがいっぱい
IOC = Inversion of Control
(go
(while true
(let [v (<! ch)]
(println v))))
(go
(loop [i 0]
(>! ch i)
(recur (inc i))))
上記の場合には??が実行されずに制御が切り替わる(パーク)
協調スレッド(IOCスレッド)
グリーンスレッド(ネイティブスレッドでない)ので生成の
コストが低い
シングルスレッドの場合でも使える(ClojureScriptでも!)
native thread native thread native thread
IOCthread
IOCthread
IOCthread
IOCthread
IOCthread
IOCthread
IOCthread
IOCthread
IOCthread
基本的な础笔滨の使い方
チャネル生成と送受信
チャネル生成:chan
送信:>!, >!!
受信:<!, <!!
(let [ch (chan 2)]
(>!! ch 0)
(>!! ch 1)
[(<!! ch) (<!! ch)])
;=> [0 1]
チャネル生成と送受信
チャネル生成:chan
送信:>!, >!!
受信:<!, <!!
(let [ch (chan 2)]
(>!! ch 0)
(>!! ch 1)
[(<!! ch) (<!! ch)])
;=> [0 1]
バッファのサイズを指定
チャネル生成と送受信
ブロッキング ノンブロッキング
送信 >!!
バッファが一杯ならブロック
>!
バッファが一杯ならパーク
受信 <!!
バッファが空ならブロック
<!
バッファが空ならパーク
チャネル生成と送受信
ブロッキング ノンブロッキング
送信 >!!
バッファが一杯ならブロック
>!
バッファが一杯ならパーク
受信 <!!
バッファが空ならブロック
<!
バッファが空ならパーク
後述のgoマクロ内でのみ使用可
goマクロ
協調スレッドで処理を実行する
(let [ch (chan 2)]
(go (while true
(let [v (<! ch)]
(println “received” v))))
(>!! ch “hi”)
(>!! ch “there”))
;; received hi
;; received there
timeout
(let [t (timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println “Waited”
(- (System/currentTimeMillis)
begin)))
;; Waited 100
タイムアウト用のチャネルを生成する
alts!
一度に複数のチャネルから待ち受ける
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println “received" v))))
(go (>! c1 "hi"))
(go (>! c2 "there")))
;; received hi
;; received there
alts!
一度に複数のチャネルから待ち受ける
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println “received" v))))
(go (>! c1 "hi"))
(go (>! c2 "there")))
;; received hi
;; received there
待ち受けるチャネル
alts!
一度に複数のチャネルから待ち受ける
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println “received" v))))
(go (>! c1 "hi"))
(go (>! c2 "there")))
;; received hi
;; received there
待ち受けるチャネル
受信した値と受信元のチャネルが返る
alts! + timeout
受信を待ち受ける時間にタイムアウトを設定
(go
(let [c (chan)
begin (System/currentTimeMillis)]
(alts! [c (timeout 100)])
(println "Gave up after”
(- (System/currentTimeMillis)
begin)))
バッファいろいろ
buffer
一杯になったらパーク or ブロック
dropping-buffer
一杯になったら追加しようとした値をドロップ
sliding-buffer
一杯になったら最初に追加した値をドロップ
ちょっと进んだトピック
ちょっと进んだトピック
チャネル間の連携
高レベルAPI
deprecated API vs transducers
チャネル間の連携
pipe
mult/tap
pub/sub mix/admix
値を振り分ける“トピック”を
あらかじめ指定しておく
高レベルAPI
チャネルによる値の送受信をClojure標準のシー
ケンス処理にみなしたAPI
内部でgoマクロを利用してチャネル間での値の
受け渡しを勝手にやってくれる
高レベルAPIを組み合わせることでgoマクロを
使った低レベルな記述をする機会を減らせる
高レベルAPI
その他、into, merge, reduce, take 等が用意
されている
(defn split [p ch]
(let [tc (chan), fc (chan)]
(go (loop []
(let [v (<! ch)]
(if (nil? v)
(do (close! tc) (close! fc))
(when (>! (if (p v) tc fc) v)
(recur))))))
[tc fc]))
c.c.a/splitの定義(一部簡略化)
deprecated API vs transducers
高レベルAPIで提供されるような、チャネルか
ら得られる値を加工したいケースは多い
channel ??? channel
deprecated API vs transducers
core.asyncではチャネル用のmapや?lterも提
供していた
(let [ch (to-chan (range))]
[(<!! ch) (<!! ch) (<!! ch)])
;=> [0 1 2]
(let [ch (->> (to-chan (range))
(map< inc)
(map< #(* % %)))]
[(<!! ch) (<!! ch) (<!! ch)])
;=> [1 4 9]
deprecated API vs transducers
Clojure 1.7以降ではtransducerを使って同様の処理が書ける
チャネル生成時にtransducerを指定するとチャネルから取得
できる値はtransducerを適用した結果の値になる
(let [xform (comp (map inc)
(map #(* % %)))
ch (pipe (to-chan (range))
(chan 1 xform))]
[(<!! ch) (<!! ch) (<!! ch)])
;=> [1 4 9]
transducer
まとめ
core.asyncはチャネルベースの非同期プログラ
ミング用ライブラリ
goマクロを使うことで協調スレッドが利用でき
ClojureScriptでも並行に動くタスクを記述可能

More Related Content

入门肠辞谤别.补蝉测苍肠