狠狠撸

狠狠撸Share a Scribd company logo
Google の基盤クローン Hadoop について  太田 一樹 <kazuki.ohta@gmail.com>
自己紹介 太田一樹 東京大学情報理工学系研究科コンピューター科学専攻石川研究室 M1 HPC 系の話 ( 並列ファイルシステム ) 個人サイト http://kzk9.net/ http://kzk9.net/blog/ 興味 OS,  ネットワーク , I/O,  分散システム OSS 的活動 I was a committer of KDE, uim, SigScheme copybench? Kernel Reporter
とは? Google の基盤ソフトウェアのクローン Google File System, MapReduce Yahoo Research  の  Doug Cutting 氏が開発 元々は Lucene のサブプロジェクト Doug の子供の持っているぬいぐるみの名前 Java で記述 !
Google 関連 参考論文  &  スライド The Google File System Sanjay Ghemawat, Howard Gobioff, and Shu-Tak Leong, SOSP 2003 MapReduce: Simplified Data Processing on Large Clusters Jeffrey Dean and Sanjay Ghemawat, SOSP 2004 Parallel Architectures and Compilation Techniques (PACT) 2006, KeyNote http://www.cs.virginia.edu/~pact2006/program/mapreduce-pact06-keynote.pdf
Hadoop 参考文献 Hadoop 公式サイト http://hadoop.apache.org/core/ Wiki:  http://wiki.apache.org/hadoop/ インストール方法?チュートリアル?プレゼン資料など Running Hadoop on Ubuntu Linux http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_(Single-Node_Cluster) http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_%28Multi-Node_Cluster%29 Hadoop, hBase で構築する大規模データ処理システム  on Codezine http://codezine.jp/a/article/aid/2448.aspx
Hadoop の開発状況
性能 Apache Hadoop wins TeraSort Benchmark! http://developer.yahoo.com/blogs/hadoop/2008/07/apache_hadoop_wins_terabyte_sort_benchmark.html 規定フォーマットの 1T データをソート 209 seconds (5G/sec, 5M/sec per node) 910 nodes, 4 dual core Xeon 2.0GHz, 1G Ether 物量作戦
性能  ( 僕が測ってみた )
関連プロジェクト hBase BigTable クローン Pig クエリ言語で MapReduce プログラムを記述 clean = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);
MapReduce Motivation
問題 インターネットの爆発的普及により、非常に大規模なデータが蓄積されている 例えば Web ページのサイズを考えてみる 200 億ページ  * 20KB = 400 TB Disk 読み込み性能は 50MB/sec (SATA) 1 台では読み込むだけでも約 100 日 保存するだけでも 500G のディスクが 1000 個程度必要 このデータを効率的に処理したい
解決方法 お金 とにかく大量のマシンを用意 1000 台マシンがあれば 1 台で 400G 処理すれば ok 読み込むのに 8000 秒程度で済む
お金だけでは解決しない プログラミングが非常に困難になる プロセス起動 プロセス監視 プロセス間通信 デバッグ 最適化 故障時への対応 しかも、新しいプログラムを作る度にこれらの問題をいちいち実装する必要がある うはー めんどくせー!
既存の分散 / 並列プログラミング環境 MPI (Message Passing Interface) 並列プログラミングのためのライブラリ スパコンの世界では主流 プログラマは各プロセスの挙動を記述 通信プリミティブ (Send, Recv, All-to-All) が提供されており、それを用いてデータ通信を実現 利点 通信パターンなどをプログラマがコントロールでき、問題に対して最適なプログラムを記述する事ができる
MPI の問題点 問題点 耐障害性への考慮が少ない アプリケーションが独自にチェックポイント機能を実装 1 万台以上の環境で計算するには耐えられない 1 台が 1000 日程度で壊れるとすると、 1 日で 10 台程度壊れる 壊れる度にチェックポイントから戻すとかやってらんない RAID 組んでもそのうち壊れるので一緒 通信パターンなどを記述する作業が多くなり、実際のアルゴリズムを記述するのにたどり着くまで時間がかかる
そこで MapReduce 大体の 大規模データ処理を行う問題に特化したプログラミングモデル アルゴリズムの記述のみにプログラマが集中できる ただし世の中の問題全てに対して最適なモデルではない ライブラリ側で面倒な事を全て担当 自動的に処理を分散 / 並列化 ロードバランシング ネットワーク転送?ディスク使用効率化 耐障害性の考慮 1 ノードで失敗したら違う場所でやりなおせばいいよね MapReduce が賢くなれば、それを使う全てのプログラムが賢くなる!
MapReduce 型の処理 WordCount Grep Sort ( 適切な Partition 関数を選択する必要 ) Log Analysis Web Graph Generation Inverted Index Construction Machine Learning NaiveBayes, K-means, Expectation Maximization, etc.
Google での使用率
MapReduce Model
MapReduce の実行フロー Data Map Data Map Data Map Reduce Reduce Data Data Shuffle
MapReduce の実行フロー 入力読み込み <key, value>* Map map: <key, value>  ?  <key’, value’>* Shuffle shuffle: <key’, reducers> ? destination reducer Reduce reduce: <key’, <value’> * >  ?  <key’’, value’’>* 出力書き出し <key’’, value’’>*
MapReduce の実行フロー Data Map Data Map Data Map Reduce Reduce Data Data Shuffle <k, v>* <k, v>* <k, v>* <k, v>* ? <k’, v’>* <k’, <v’>*>* ? <k’’, v’’>* <k, v>* ? <k’, v’>* <k, v>* ? <k’, v’>* <k’, <v’>*>* ? <k’’, v’’>*
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buzz 入力文書 : doc1
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: foo doc1: foo doc1: bar doc1: bar doc1: buz
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: bar doc1: bar doc1: buz doc1: foo doc1: foo
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: bar doc1: bar doc1: buz doc1: foo doc1: foo foo: 1 foo: 1 bar: 1 foo: 1 bar: 1 buz: 1
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 foo: 1 foo: 1 bar: 1 foo: 1 bar: 1 buz: 1 bar: <1, 1> buz: <1> foo: <1, 1, 1>
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 bar: <1, 1> buz: <1> foo: <1, 1, 1>
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 foo: <1, 1, 1> bar: <1, 1> buz: <1> foo: 3 bar: 2 buz: 1
例 :  ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 bar: 2 buz: 1 foo: 3
例 :  ワードカウント 擬似コード map(string key, string value) { foreach word in value: emit(word, 1); } reduce(string key, vector<int> values) { int result = 0; for (int i = 0; I < values.size(); i++) result += values[i]; emit(key, result); }
MapReduce の特徴 データ通信 各 Map 処理、 Reduce 処理は完全に並列に実行可能 マシンを増やせばその分処理能力が増える 耐故障性 失敗した Map, Reduce 処理は他のノードで再実行される 遅い Map, Reduce 処理についても同じ ローカリティ データのある場所で計算を始めれば、ネットワークを使う必要がなくなる Moving Computation is Cheaper Than Moving Data
論文に書かれていない、僕が思う MapReduce の短所 Shuffle フェーズで大規模に通信が発生 全 Mapper <->  全 Reducer の通信 輻輳が起こって、台数が多いと進まなくなりそう 数百~数千台程度のクラスタがいくつも有る? Map Map Map Reduce Reduce Shuffle
Hadoop Architecture
Hadoop の中身 Hadoop Distributed File System (HDFS) GFS のクローン MapReduce プログラムの入力や出力に使用 Hadoop MapReduce MapReduce 実現するためのサーバー , ライブラリ
Hadoop Distributed File System Master/Slave  アーキテクチャ ファイルはブロックという単位に分割して保存 NameNode Master ファイルのメタデータ ( パス?権限など ) を管理 DataNode Slave 実際のデータ ( ブロックを管理 )
From: http://hadoop.apache.org/core/docs/current/hdfs_design.html
データ配置のアルゴリズム (1) あるデータをレプリケーション付きで書き込みたいとき、どのノードに配置するか? 転送量を少なく なるべく安全に  ( 異なるラック?異なる DC)
レプリケーションのアルゴリズム (2) Hadoop は結構適当にやっている 1 つ目は必ずローカルに書く 2 つ目は異なるラックに書く 3 つ目は同じラックの違うノードに書く 4 つ目移行はランダム 意外とこういう適当なのが 上手く行くの かもしれない
GFS に有って HDFS に無いもの Atomic な Append HADOOP-1700 http://issues.apache.org/jira/browse/HADOOP-1700 ロック GFS では Chubby と呼ばれる分散ロックサービスを使用している Hadoop には Chubby 相当のものがないので、ファイルの上書きは出来ず、新規作成しか出来ない
Hadoop MapReduce Master/Slave  アーキテクチャ JobTracker Master Job を Task に分割し、 Task を各 TaskTracker に分配 Job: MapReduce プログラムの実行単位 Task: MapTask, ReduceTask 全ての Task の進行状況を監視し、死んだり遅れたりした Task は別の TaskTracker で実行させる TaskTracker Slave JobTracker にアサインされた Task を実行 実際の計算処理を行う
MapReduce Architecture JobTracker TaskTracker
Map フェーズ 分割された入力を読み込み、 map プログラムを動かす Partitioner( 通常は Hash) を使用して宛先を決定 バッファサイズが閾値を越えたらメモリ上でソートしてディスクに書き出す すべてが終わったらソートされたものをさらに外部マージソート (k, v) Reducer1 宛て Reducer2 宛て
Reduce フェーズ Map フェーズの出力をフェッチ メモリ上でキー毎にまとめあげる Reduce プログラムを動かす 出力を HDFS に書き出し Amazon S3 などにも書き出せる
Hadoop HDFS Manual
HDFS の操作方法 # ls alias dfsls='~/hadoop/bin/hadoop dfs -ls‘ # ls -r alias dfslsr='~/hadoop/bin/hadoop dfs -lsr‘ # rm alias dfsrm='~/hadoop/bin/hadoop dfs -rm‘ # rm -r alias dfsrmr='~/hadoop/bin/hadoop dfs -rmr' # cat alias dfscat='~/hadoop/bin/hadoop dfs -cat‘ # mkdir alias dfsmkdir='~/hadoop/bin/hadoop dfs -mkdir‘
HDFS の操作方法 hadoop@pficore:~$ dfsls Found 5 items /user/hadoop/access-log <r 3>  3003  2008-04-30 00:21 /user/hadoop/hoge  <r 3>  2183  2008-04-30 00:32 /user/hadoop/reported  <dir>  2008-04-30 00:28 /user/hadoop/wcinput  <r 3>  29  2008-05-08 10:17 /user/hadoop/wcoutput  <dir>  2008-05-08 11:48
HDFS の操作方法 HDFS 上にファイルを転送 HDFS 上からファイルを転送 alias dfsput='~/hadoop/bin/hadoop dfs -put‘ dfsput  <local-path>  <hdfs-path> alias dfsget='~/hadoop/bin/hadoop dfs -get‘ dfsget  <hdfs-path>  <local-path>
HDFS の使用方法 hadoop@pficore:~$ dfsls Found 0 items hadoop@pficore:~$ dfsput hoge.txt hoge.txt hadoop@pficore:~$ dfsls Found 1 items /user/hadoop/hoge.txt  <r 3>  31  2008-05-08 12:00
HDFS の特徴 Master/Slave アーキテクチャなので、 Master に付加が高まると全体のスループットが落ちる メタデータ操作を少なくするのがポイント 出来るだけ”少ない数”の”巨大なファイル”を格納するようにする 感覚的には数百 M ~数 G 程度のファイルを作るのが良さそう
Hadoop Programming on Hadoop with “Java”
Skipped for Kernel Hackers  ? who never want to write Java :-P
Hadoop Programming on Hadoop with “ Hadoop Streaming ” (sh, C, C++, Ruby, Python, etc.)
HadoopStreaming 標準入出力を介して MapReduce 処理を書けるようにするための仕組み sh  ?  C++  ?  Ruby  ?  Python など、任意の言語で MapReduce が可能になる http://hadoop.apache.org/core/docs/r0.15.3/streaming.html Hadoop Streaming は単純な wrapper ライブラリ 指定したプログラムの標準入力に <key, value> を渡す 標準出力から結果を読み込み、それを出力 Amazon, Facebook 等でも Streaming 経由で Hadoop が使われているらしい http://wiki.apache.org/hadoop/PoweredBy
使い方 実行方法 ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input   inputdir  [HDFS のパス ] -output  outputdir  [HDFS のパス ] -mapper  map  [map プログラムのパス ] -reduce  reduce  [reduce プログラムのパス ] -inputformat  [TextInputFormat | SequenceFileAsTextInputFormat] -outputformat  [TextOutputFormat]
InputFormat -inputformat オプション 入力ファイルのフォーマットを指定 TextInputFormat (default) ファイルの任意の位置で適当に分割される <k, v> = < ファイルのオフセット ,  ファイルの中身 > SequenceFileAsTextInputFormat 1 行 1map 処理 “ keyvalue” という列が並んでいるファイル郡を用意 <k, v> = <key, value> (1 行毎 )
TextInputFormat SequenceFileAsTextInputFormat Hoge fu ga fafa fdsaf dasfd sak fjadsj fdsaf dsafdsa fdsafdsafdsa fadsfdsa fdsa fsafds <offset, Hoge fu ga fafa fdsaf dasfd sak fjadsj fdsaf > <offset, fdsafdsafdsa fadsfdsa fdsa fsafds> k1 v1 k2 v2 k3 v3 <k1, v1> <k2, v2> <k3, v3> mapper mapper
OutputFormat -outputformat オプション 出力ファイルのフォーマットを指定 TextOutputFormat (default) “ keyvalue” が 1 行ずつ書き出される
Map 標準入力を読み込み inputFormat によって渡されるものが違う TextInputFormat の場合、 value しか渡されない 内部的には key にファイルの offset が使われているが、意味の無い情報なのでそもそも渡されないようになっている SequenceFileAsTextInput の場合、 key と value が    区切りで渡される 結果を標準出力に書き込み key’, value’ を    区切りで 1 行ずつ出力
Reduce 標準入力を読み込み keyval  のように 1 行で key と val が渡される 同じ key については1つの reducer 内で処理される事が保証されている 標準出力に書き出し key, value を    文字区切りで 1 行ずつ書き出し
例 : Ruby によるワードカウント map.rb #!/usr/bin/env ruby while !STDIN.eof? line = STDIN.readline.strip ws = line.split ws.each { |w| puts &quot;#{w}1“ } end reduce.rb #!/usr/bin/env ruby h = {} while !STDIN.eof? line = STDIN.readline.strip word = line.split(&quot;&quot;)[0] unless h.has_key? word h[word] = 1 else h[word] += 1 end end h.each { |w, c| puts &quot;#{w}#{c}“ } $ ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input wcinput -output wcoutput -mapper /home/hadoop/kzk/map.rb -reducer /home/hadoop/kzk/reduce.rb -inputformat TextInputFormat -outputformat TextOutputFormat
例 :  出力を圧縮する -jobconf  mapred.output.compress=true Key, Value それぞれに対して gzip 圧縮がかかる 読み込む為には特別なオプションは必要なし $ ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input wcinput -output wcoutput -mapper /home/hadoop/kzk/map.rb -reducer /home/hadoop/kzk/reduce.rb -inputformat TextInputFormat -outputformat TextOutputFormat -jobconf  mapred.output.compress=true
ためしに書いてみたもの ワードカウント Wikipedia 全体 中間ファイル圧縮 出力ファイル圧縮 検索インデックス作成 20G Lucene を使用 全部 I/O ネック 数十台程度の環境で特に問題なく動いた
Enjoy Playing Around Hadoop  ? Thank you! kzk

More Related Content

Googleの基盤クローン Hadoopについて

  • 1. Google の基盤クローン Hadoop について 太田 一樹 <kazuki.ohta@gmail.com>
  • 2. 自己紹介 太田一樹 東京大学情報理工学系研究科コンピューター科学専攻石川研究室 M1 HPC 系の話 ( 並列ファイルシステム ) 個人サイト http://kzk9.net/ http://kzk9.net/blog/ 興味 OS, ネットワーク , I/O, 分散システム OSS 的活動 I was a committer of KDE, uim, SigScheme copybench? Kernel Reporter
  • 3. とは? Google の基盤ソフトウェアのクローン Google File System, MapReduce Yahoo Research の Doug Cutting 氏が開発 元々は Lucene のサブプロジェクト Doug の子供の持っているぬいぐるみの名前 Java で記述 !
  • 4. Google 関連 参考論文 & スライド The Google File System Sanjay Ghemawat, Howard Gobioff, and Shu-Tak Leong, SOSP 2003 MapReduce: Simplified Data Processing on Large Clusters Jeffrey Dean and Sanjay Ghemawat, SOSP 2004 Parallel Architectures and Compilation Techniques (PACT) 2006, KeyNote http://www.cs.virginia.edu/~pact2006/program/mapreduce-pact06-keynote.pdf
  • 5. Hadoop 参考文献 Hadoop 公式サイト http://hadoop.apache.org/core/ Wiki: http://wiki.apache.org/hadoop/ インストール方法?チュートリアル?プレゼン資料など Running Hadoop on Ubuntu Linux http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_(Single-Node_Cluster) http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_%28Multi-Node_Cluster%29 Hadoop, hBase で構築する大規模データ処理システム on Codezine http://codezine.jp/a/article/aid/2448.aspx
  • 7. 性能 Apache Hadoop wins TeraSort Benchmark! http://developer.yahoo.com/blogs/hadoop/2008/07/apache_hadoop_wins_terabyte_sort_benchmark.html 規定フォーマットの 1T データをソート 209 seconds (5G/sec, 5M/sec per node) 910 nodes, 4 dual core Xeon 2.0GHz, 1G Ether 物量作戦
  • 8. 性能 ( 僕が測ってみた )
  • 9. 関連プロジェクト hBase BigTable クローン Pig クエリ言語で MapReduce プログラムを記述 clean = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);
  • 11. 問題 インターネットの爆発的普及により、非常に大規模なデータが蓄積されている 例えば Web ページのサイズを考えてみる 200 億ページ * 20KB = 400 TB Disk 読み込み性能は 50MB/sec (SATA) 1 台では読み込むだけでも約 100 日 保存するだけでも 500G のディスクが 1000 個程度必要 このデータを効率的に処理したい
  • 12. 解決方法 お金 とにかく大量のマシンを用意 1000 台マシンがあれば 1 台で 400G 処理すれば ok 読み込むのに 8000 秒程度で済む
  • 13. お金だけでは解決しない プログラミングが非常に困難になる プロセス起動 プロセス監視 プロセス間通信 デバッグ 最適化 故障時への対応 しかも、新しいプログラムを作る度にこれらの問題をいちいち実装する必要がある うはー めんどくせー!
  • 14. 既存の分散 / 並列プログラミング環境 MPI (Message Passing Interface) 並列プログラミングのためのライブラリ スパコンの世界では主流 プログラマは各プロセスの挙動を記述 通信プリミティブ (Send, Recv, All-to-All) が提供されており、それを用いてデータ通信を実現 利点 通信パターンなどをプログラマがコントロールでき、問題に対して最適なプログラムを記述する事ができる
  • 15. MPI の問題点 問題点 耐障害性への考慮が少ない アプリケーションが独自にチェックポイント機能を実装 1 万台以上の環境で計算するには耐えられない 1 台が 1000 日程度で壊れるとすると、 1 日で 10 台程度壊れる 壊れる度にチェックポイントから戻すとかやってらんない RAID 組んでもそのうち壊れるので一緒 通信パターンなどを記述する作業が多くなり、実際のアルゴリズムを記述するのにたどり着くまで時間がかかる
  • 16. そこで MapReduce 大体の 大規模データ処理を行う問題に特化したプログラミングモデル アルゴリズムの記述のみにプログラマが集中できる ただし世の中の問題全てに対して最適なモデルではない ライブラリ側で面倒な事を全て担当 自動的に処理を分散 / 並列化 ロードバランシング ネットワーク転送?ディスク使用効率化 耐障害性の考慮 1 ノードで失敗したら違う場所でやりなおせばいいよね MapReduce が賢くなれば、それを使う全てのプログラムが賢くなる!
  • 17. MapReduce 型の処理 WordCount Grep Sort ( 適切な Partition 関数を選択する必要 ) Log Analysis Web Graph Generation Inverted Index Construction Machine Learning NaiveBayes, K-means, Expectation Maximization, etc.
  • 20. MapReduce の実行フロー Data Map Data Map Data Map Reduce Reduce Data Data Shuffle
  • 21. MapReduce の実行フロー 入力読み込み <key, value>* Map map: <key, value> ? <key’, value’>* Shuffle shuffle: <key’, reducers> ? destination reducer Reduce reduce: <key’, <value’> * > ? <key’’, value’’>* 出力書き出し <key’’, value’’>*
  • 22. MapReduce の実行フロー Data Map Data Map Data Map Reduce Reduce Data Data Shuffle <k, v>* <k, v>* <k, v>* <k, v>* ? <k’, v’>* <k’, <v’>*>* ? <k’’, v’’>* <k, v>* ? <k’, v’>* <k, v>* ? <k’, v’>* <k’, <v’>*>* ? <k’’, v’’>*
  • 23. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buzz 入力文書 : doc1
  • 24. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: foo doc1: foo doc1: bar doc1: bar doc1: buz
  • 25. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data Shuffle foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: bar doc1: bar doc1: buz doc1: foo doc1: foo
  • 26. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 doc1: foo doc1: bar doc1: bar doc1: buz doc1: foo doc1: foo foo: 1 foo: 1 bar: 1 foo: 1 bar: 1 buz: 1
  • 27. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 foo: 1 foo: 1 bar: 1 foo: 1 bar: 1 buz: 1 bar: <1, 1> buz: <1> foo: <1, 1, 1>
  • 28. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 bar: <1, 1> buz: <1> foo: <1, 1, 1>
  • 29. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 foo: <1, 1, 1> bar: <1, 1> buz: <1> foo: 3 bar: 2 buz: 1
  • 30. 例 : ワードカウント Data Map Data Map Data Map Reduce Reduce Data Data foo foo foo bar bar buz 入力文書 : doc1 bar: 2 buz: 1 foo: 3
  • 31. 例 : ワードカウント 擬似コード map(string key, string value) { foreach word in value: emit(word, 1); } reduce(string key, vector<int> values) { int result = 0; for (int i = 0; I < values.size(); i++) result += values[i]; emit(key, result); }
  • 32. MapReduce の特徴 データ通信 各 Map 処理、 Reduce 処理は完全に並列に実行可能 マシンを増やせばその分処理能力が増える 耐故障性 失敗した Map, Reduce 処理は他のノードで再実行される 遅い Map, Reduce 処理についても同じ ローカリティ データのある場所で計算を始めれば、ネットワークを使う必要がなくなる Moving Computation is Cheaper Than Moving Data
  • 33. 論文に書かれていない、僕が思う MapReduce の短所 Shuffle フェーズで大規模に通信が発生 全 Mapper <-> 全 Reducer の通信 輻輳が起こって、台数が多いと進まなくなりそう 数百~数千台程度のクラスタがいくつも有る? Map Map Map Reduce Reduce Shuffle
  • 35. Hadoop の中身 Hadoop Distributed File System (HDFS) GFS のクローン MapReduce プログラムの入力や出力に使用 Hadoop MapReduce MapReduce 実現するためのサーバー , ライブラリ
  • 36. Hadoop Distributed File System Master/Slave アーキテクチャ ファイルはブロックという単位に分割して保存 NameNode Master ファイルのメタデータ ( パス?権限など ) を管理 DataNode Slave 実際のデータ ( ブロックを管理 )
  • 39. レプリケーションのアルゴリズム (2) Hadoop は結構適当にやっている 1 つ目は必ずローカルに書く 2 つ目は異なるラックに書く 3 つ目は同じラックの違うノードに書く 4 つ目移行はランダム 意外とこういう適当なのが 上手く行くの かもしれない
  • 40. GFS に有って HDFS に無いもの Atomic な Append HADOOP-1700 http://issues.apache.org/jira/browse/HADOOP-1700 ロック GFS では Chubby と呼ばれる分散ロックサービスを使用している Hadoop には Chubby 相当のものがないので、ファイルの上書きは出来ず、新規作成しか出来ない
  • 41. Hadoop MapReduce Master/Slave アーキテクチャ JobTracker Master Job を Task に分割し、 Task を各 TaskTracker に分配 Job: MapReduce プログラムの実行単位 Task: MapTask, ReduceTask 全ての Task の進行状況を監視し、死んだり遅れたりした Task は別の TaskTracker で実行させる TaskTracker Slave JobTracker にアサインされた Task を実行 実際の計算処理を行う
  • 43. Map フェーズ 分割された入力を読み込み、 map プログラムを動かす Partitioner( 通常は Hash) を使用して宛先を決定 バッファサイズが閾値を越えたらメモリ上でソートしてディスクに書き出す すべてが終わったらソートされたものをさらに外部マージソート (k, v) Reducer1 宛て Reducer2 宛て
  • 44. Reduce フェーズ Map フェーズの出力をフェッチ メモリ上でキー毎にまとめあげる Reduce プログラムを動かす 出力を HDFS に書き出し Amazon S3 などにも書き出せる
  • 46. HDFS の操作方法 # ls alias dfsls='~/hadoop/bin/hadoop dfs -ls‘ # ls -r alias dfslsr='~/hadoop/bin/hadoop dfs -lsr‘ # rm alias dfsrm='~/hadoop/bin/hadoop dfs -rm‘ # rm -r alias dfsrmr='~/hadoop/bin/hadoop dfs -rmr' # cat alias dfscat='~/hadoop/bin/hadoop dfs -cat‘ # mkdir alias dfsmkdir='~/hadoop/bin/hadoop dfs -mkdir‘
  • 47. HDFS の操作方法 hadoop@pficore:~$ dfsls Found 5 items /user/hadoop/access-log <r 3> 3003 2008-04-30 00:21 /user/hadoop/hoge <r 3> 2183 2008-04-30 00:32 /user/hadoop/reported <dir> 2008-04-30 00:28 /user/hadoop/wcinput <r 3> 29 2008-05-08 10:17 /user/hadoop/wcoutput <dir> 2008-05-08 11:48
  • 48. HDFS の操作方法 HDFS 上にファイルを転送 HDFS 上からファイルを転送 alias dfsput='~/hadoop/bin/hadoop dfs -put‘ dfsput <local-path> <hdfs-path> alias dfsget='~/hadoop/bin/hadoop dfs -get‘ dfsget <hdfs-path> <local-path>
  • 49. HDFS の使用方法 hadoop@pficore:~$ dfsls Found 0 items hadoop@pficore:~$ dfsput hoge.txt hoge.txt hadoop@pficore:~$ dfsls Found 1 items /user/hadoop/hoge.txt <r 3> 31 2008-05-08 12:00
  • 50. HDFS の特徴 Master/Slave アーキテクチャなので、 Master に付加が高まると全体のスループットが落ちる メタデータ操作を少なくするのがポイント 出来るだけ”少ない数”の”巨大なファイル”を格納するようにする 感覚的には数百 M ~数 G 程度のファイルを作るのが良さそう
  • 51. Hadoop Programming on Hadoop with “Java”
  • 52. Skipped for Kernel Hackers ? who never want to write Java :-P
  • 53. Hadoop Programming on Hadoop with “ Hadoop Streaming ” (sh, C, C++, Ruby, Python, etc.)
  • 54. HadoopStreaming 標準入出力を介して MapReduce 処理を書けるようにするための仕組み sh ? C++ ? Ruby ? Python など、任意の言語で MapReduce が可能になる http://hadoop.apache.org/core/docs/r0.15.3/streaming.html Hadoop Streaming は単純な wrapper ライブラリ 指定したプログラムの標準入力に <key, value> を渡す 標準出力から結果を読み込み、それを出力 Amazon, Facebook 等でも Streaming 経由で Hadoop が使われているらしい http://wiki.apache.org/hadoop/PoweredBy
  • 55. 使い方 実行方法 ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input inputdir [HDFS のパス ] -output outputdir [HDFS のパス ] -mapper map [map プログラムのパス ] -reduce reduce [reduce プログラムのパス ] -inputformat [TextInputFormat | SequenceFileAsTextInputFormat] -outputformat [TextOutputFormat]
  • 56. InputFormat -inputformat オプション 入力ファイルのフォーマットを指定 TextInputFormat (default) ファイルの任意の位置で適当に分割される <k, v> = < ファイルのオフセット , ファイルの中身 > SequenceFileAsTextInputFormat 1 行 1map 処理 “ keyvalue” という列が並んでいるファイル郡を用意 <k, v> = <key, value> (1 行毎 )
  • 57. TextInputFormat SequenceFileAsTextInputFormat Hoge fu ga fafa fdsaf dasfd sak fjadsj fdsaf dsafdsa fdsafdsafdsa fadsfdsa fdsa fsafds <offset, Hoge fu ga fafa fdsaf dasfd sak fjadsj fdsaf > <offset, fdsafdsafdsa fadsfdsa fdsa fsafds> k1 v1 k2 v2 k3 v3 <k1, v1> <k2, v2> <k3, v3> mapper mapper
  • 58. OutputFormat -outputformat オプション 出力ファイルのフォーマットを指定 TextOutputFormat (default) “ keyvalue” が 1 行ずつ書き出される
  • 59. Map 標準入力を読み込み inputFormat によって渡されるものが違う TextInputFormat の場合、 value しか渡されない 内部的には key にファイルの offset が使われているが、意味の無い情報なのでそもそも渡されないようになっている SequenceFileAsTextInput の場合、 key と value が 区切りで渡される 結果を標準出力に書き込み key’, value’ を 区切りで 1 行ずつ出力
  • 60. Reduce 標準入力を読み込み keyval のように 1 行で key と val が渡される 同じ key については1つの reducer 内で処理される事が保証されている 標準出力に書き出し key, value を 文字区切りで 1 行ずつ書き出し
  • 61. 例 : Ruby によるワードカウント map.rb #!/usr/bin/env ruby while !STDIN.eof? line = STDIN.readline.strip ws = line.split ws.each { |w| puts &quot;#{w}1“ } end reduce.rb #!/usr/bin/env ruby h = {} while !STDIN.eof? line = STDIN.readline.strip word = line.split(&quot;&quot;)[0] unless h.has_key? word h[word] = 1 else h[word] += 1 end end h.each { |w, c| puts &quot;#{w}#{c}“ } $ ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input wcinput -output wcoutput -mapper /home/hadoop/kzk/map.rb -reducer /home/hadoop/kzk/reduce.rb -inputformat TextInputFormat -outputformat TextOutputFormat
  • 62. 例 : 出力を圧縮する -jobconf mapred.output.compress=true Key, Value それぞれに対して gzip 圧縮がかかる 読み込む為には特別なオプションは必要なし $ ./bin/hadoop jar contrib/hadoop-0.15.3-streaming.jar -input wcinput -output wcoutput -mapper /home/hadoop/kzk/map.rb -reducer /home/hadoop/kzk/reduce.rb -inputformat TextInputFormat -outputformat TextOutputFormat -jobconf mapred.output.compress=true
  • 63. ためしに書いてみたもの ワードカウント Wikipedia 全体 中間ファイル圧縮 出力ファイル圧縮 検索インデックス作成 20G Lucene を使用 全部 I/O ネック 数十台程度の環境で特に問題なく動いた
  • 64. Enjoy Playing Around Hadoop ? Thank you! kzk