狠狠撸

狠狠撸Share a Scribd company logo
Core.async & 
transducers 
@ktsujister
こんな処理があるとします 
? input: file 
? 一行ごとにIDが記載されている 
? 処理A 
? 受け取ったIDを元に、URLを作成 
? 処理B 
? 受け取ったURLを元に、外部サイトからjsonをHTTP GET 
? 処理C 
? 受け取ったjsonをパースして出力ファイルに書き込む
lazy-seqベースのフロー 
;;; lazy-seq based flow 
(defn proc-a [id] 
;; 処理A 
url) 
(defn proc-b [url] 
;; 処理B 
json 
) 
(defn proc-c [json] 
;; 処理C 
) 
(->> (io/reader "ids.txt") 
line-seq 
(map proc-a) 
(map proc-b) 
(map proc-c))
弱点 
? cpu-coreが複数あっても、一個しか忙しくない 
? http getを待っている間に処理が行われない
core.async + transducer 
;;; core.async + transducer based flow 
(let [in-ch (->> (io/reader "ids.txt") 
line-seq 
a/to-chan) 
xf (comp (map proc-a) 
(map proc-b) 
(map proc-c)) 
out-ch (chan) 
worker-count 4] 
(a/pipeline worker-count out-ch xf in-ch)) 
複数のスレッドで処理
core.async + transducer 2 
;;; core.async + transducer based flow 
(let [in-ch (->> (io/reader "ids.txt") 
line-seq 
a/to-chan) 
a-xf (map proc-a) 
b-xf (map proc-b) 
c-xf (map proc-c) 
a-worker-count 8 
a-out-ch (chan a-worker-count) 
b-worker-count 4 
b-out-ch (chan b-worker-count) 
c-worker-count 2 
c-out-ch (chan c-worker-count)] 
(a/pipeline a-worker-count a-out-ch a-xf in-ch) 
(a/pipeline b-worker-count b-out-ch b-xf a-out-ch) 
(a/pipeline-blocking c-worker-count c-out-ch c-xf b-out-ch) 
) 
処理ごとにワーカ数を指定可能
transducer 
? http://clojure.org/transducers 
? clojure 1.7以降 
? map系変換処理をcomposeできる 
? transducerが作れるfunction 
? map cat mapcat filter remove take take-while take-nth drop drop-while replace 
partition-by partition-all keep keep-indexed dedupe random-sample 
? core.async supports transducers! 
? clojure.coreのみで使用する場合は、transduceで実行
http://youtu.be/6mTbuzafcII?t=44m8s
References 
? https://gist.github.com/ktsujister/ 
67b68c059e8b6b540cf3 
? http://clojure.org/transducers 
? https://www.youtube.com/watch?v=6mTbuzafcII

More Related Content

What's hot (20)

PDF
Custom Scan API - PostgreSQL Unconference #3 (18-Jan-2014)
Kohei KaiGai
?
PDF
160713
robo_lab
?
PDF
贵濒耻别苍迟诲へようこそ
Manabu Shinsaka
?
PDF
高スループットなサーバアプリケーションの為の新しいフレームワーク?「厂别补蝉迟补谤」
Takuya ASADA
?
PDF
Openresty
ogawatti
?
PPTX
蝉别谤惫别谤蝉辫别肠を使用したサーバ设定テストの実例
Koichi Shimozono
?
PDF
Hol012 windowsコンテナー始動
Tech Summit 2016
?
PDF
WDD2012_SC-004
Kuninobu SaSaki
?
PPTX
Javaで簡単にgpgpu aparapi
Ken'ichi Sakiyama
?
PDF
骋碍贰で半年运用してみた
Katsutoshi Nagaoka
?
PPTX
ConfD で Linux にNetconfを喋らせてみた
Akira Iwamoto
?
PDF
Hachioji pm 21
Yusuke Hosokoshi
?
PDF
Nuxt.js + microCMS + netlify
ogawatti
?
PPTX
贵濒耻别苍迟诲の使い方
Tomohiro Goya
?
PDF
毎秒2000搁别辩耻别蝉迟を捌く笔别谤濒製颁惭厂の内部构造(顿别产颈补苍サーバ1台にて)
nabe-abk
?
PDF
OpenStack + Common Lisp
irix_jp
?
PDF
Capistrano
Yasuharu Fukuda
?
PDF
Serverspecを自分好みにアレンシ? スクリーンショットて?証跡保存を撲滅-
Daisuke Ikeda
?
PDF
No SSH (@nojima; KMC関東例会)
京大 マイコンクラブ
?
PDF
颁别辫丑と骋濒耻蝉迟别谤次期ハ?ーシ?ョンて?の新机能
Emma Haruka Iwao
?
Custom Scan API - PostgreSQL Unconference #3 (18-Jan-2014)
Kohei KaiGai
?
160713
robo_lab
?
贵濒耻别苍迟诲へようこそ
Manabu Shinsaka
?
高スループットなサーバアプリケーションの為の新しいフレームワーク?「厂别补蝉迟补谤」
Takuya ASADA
?
Openresty
ogawatti
?
蝉别谤惫别谤蝉辫别肠を使用したサーバ设定テストの実例
Koichi Shimozono
?
Hol012 windowsコンテナー始動
Tech Summit 2016
?
WDD2012_SC-004
Kuninobu SaSaki
?
Javaで簡単にgpgpu aparapi
Ken'ichi Sakiyama
?
骋碍贰で半年运用してみた
Katsutoshi Nagaoka
?
ConfD で Linux にNetconfを喋らせてみた
Akira Iwamoto
?
Hachioji pm 21
Yusuke Hosokoshi
?
Nuxt.js + microCMS + netlify
ogawatti
?
贵濒耻别苍迟诲の使い方
Tomohiro Goya
?
毎秒2000搁别辩耻别蝉迟を捌く笔别谤濒製颁惭厂の内部构造(顿别产颈补苍サーバ1台にて)
nabe-abk
?
OpenStack + Common Lisp
irix_jp
?
Capistrano
Yasuharu Fukuda
?
Serverspecを自分好みにアレンシ? スクリーンショットて?証跡保存を撲滅-
Daisuke Ikeda
?
No SSH (@nojima; KMC関東例会)
京大 マイコンクラブ
?
颁别辫丑と骋濒耻蝉迟别谤次期ハ?ーシ?ョンて?の新机能
Emma Haruka Iwao
?

Recently uploaded (9)

PDF
Forguncy 10 製品概要資料 - ノーコードWebアプリ開発プラットフォーム
フォーガンシー
?
PDF
論文紹介:AutoPrompt: Eliciting Knowledge from Language Models with Automatically ...
Toru Tamaki
?
PDF
安尾 萌, 藤代 裕之, 松下 光範. 協調的情報トリアージにおけるコミュニケーションの影響についての検討, 第11回データ工学と情報マネジメントに関する...
Matsushita Laboratory
?
PPTX
Vibe Codingを始めよう ?Cursorを例に、ノーコードでのプログラミング体験?
iPride Co., Ltd.
?
PPTX
勉強会_ターミナルコマント?入力迅速化_20250620. pptx. .
iPride Co., Ltd.
?
PDF
論文紹介:Unbiasing through Textual Descriptions: Mitigating Representation Bias i...
Toru Tamaki
?
PPTX
色について.pptx .
iPride Co., Ltd.
?
PDF
安尾 萌, 北村 茂生, 松下 光範. 災害発生時における被害状況把握を目的とした情報共有システムの基礎検討, 電子情報通信学会HCGシンポジウム2018...
Matsushita Laboratory
?
PDF
安尾 萌, 松下 光範. 環境馴致を計量可能にするための試み,人工知能学会第4回仕掛学研究会, 2018.
Matsushita Laboratory
?
Forguncy 10 製品概要資料 - ノーコードWebアプリ開発プラットフォーム
フォーガンシー
?
論文紹介:AutoPrompt: Eliciting Knowledge from Language Models with Automatically ...
Toru Tamaki
?
安尾 萌, 藤代 裕之, 松下 光範. 協調的情報トリアージにおけるコミュニケーションの影響についての検討, 第11回データ工学と情報マネジメントに関する...
Matsushita Laboratory
?
Vibe Codingを始めよう ?Cursorを例に、ノーコードでのプログラミング体験?
iPride Co., Ltd.
?
勉強会_ターミナルコマント?入力迅速化_20250620. pptx. .
iPride Co., Ltd.
?
論文紹介:Unbiasing through Textual Descriptions: Mitigating Representation Bias i...
Toru Tamaki
?
色について.pptx .
iPride Co., Ltd.
?
安尾 萌, 北村 茂生, 松下 光範. 災害発生時における被害状況把握を目的とした情報共有システムの基礎検討, 電子情報通信学会HCGシンポジウム2018...
Matsushita Laboratory
?
安尾 萌, 松下 光範. 環境馴致を計量可能にするための試み,人工知能学会第4回仕掛学研究会, 2018.
Matsushita Laboratory
?
Ad

core.async+transducers Shibuya.lisp #21

  • 2. こんな処理があるとします ? input: file ? 一行ごとにIDが記載されている ? 処理A ? 受け取ったIDを元に、URLを作成 ? 処理B ? 受け取ったURLを元に、外部サイトからjsonをHTTP GET ? 処理C ? 受け取ったjsonをパースして出力ファイルに書き込む
  • 3. lazy-seqベースのフロー ;;; lazy-seq based flow (defn proc-a [id] ;; 処理A url) (defn proc-b [url] ;; 処理B json ) (defn proc-c [json] ;; 処理C ) (->> (io/reader "ids.txt") line-seq (map proc-a) (map proc-b) (map proc-c))
  • 4. 弱点 ? cpu-coreが複数あっても、一個しか忙しくない ? http getを待っている間に処理が行われない
  • 5. core.async + transducer ;;; core.async + transducer based flow (let [in-ch (->> (io/reader "ids.txt") line-seq a/to-chan) xf (comp (map proc-a) (map proc-b) (map proc-c)) out-ch (chan) worker-count 4] (a/pipeline worker-count out-ch xf in-ch)) 複数のスレッドで処理
  • 6. core.async + transducer 2 ;;; core.async + transducer based flow (let [in-ch (->> (io/reader "ids.txt") line-seq a/to-chan) a-xf (map proc-a) b-xf (map proc-b) c-xf (map proc-c) a-worker-count 8 a-out-ch (chan a-worker-count) b-worker-count 4 b-out-ch (chan b-worker-count) c-worker-count 2 c-out-ch (chan c-worker-count)] (a/pipeline a-worker-count a-out-ch a-xf in-ch) (a/pipeline b-worker-count b-out-ch b-xf a-out-ch) (a/pipeline-blocking c-worker-count c-out-ch c-xf b-out-ch) ) 処理ごとにワーカ数を指定可能
  • 7. transducer ? http://clojure.org/transducers ? clojure 1.7以降 ? map系変換処理をcomposeできる ? transducerが作れるfunction ? map cat mapcat filter remove take take-while take-nth drop drop-while replace partition-by partition-all keep keep-indexed dedupe random-sample ? core.async supports transducers! ? clojure.coreのみで使用する場合は、transduceで実行
  • 9. References ? https://gist.github.com/ktsujister/ 67b68c059e8b6b540cf3 ? http://clojure.org/transducers ? https://www.youtube.com/watch?v=6mTbuzafcII