狠狠撸

狠狠撸Share a Scribd company logo
Spark DataFrames Introduction
道玄坂LT祭り第2回(統計、機械学習、データ抽出)
Yu Ishikawa
1
発表のゴール
? Hadoop MapReduce を書くのめんどいわー
? Python ユーザや R ユーザが,ビッグデータを
気軽に扱うにはどうすりゃいいの?
2
それ Spark DataFrame でできるよ
Apache Spark 知ってるよね?
3
Speed
4
Hadoop MapReduce に比べて 10x ~ 100x 早い
Ease of Use
5
PageRank も数行で実装できる
Generality
6
機械学習ライブラリなども標準で利用できる
Runs Everywhere
7
Hadoop, Mesos などと豊富な连携
2015 年の重点開発方針
? Data Science
– 使いやすい high-level APIs の提供
? scikit-learn のような一貫性をもった APIs
? Platform Interfaces
– 標準対応以外のデータソースへのアクセスやアルゴリズ
ムをより簡単に利用できるインターフェースの提供
? spark-packages というパッケージ管理ツールのようなもので 3rd
party ライブラリを利用できるようにする
8
DataFrame APIs の紹介
「Spark なにそれ?」って人も多いと
思うので表面的な話をします
9
Agenda
? 導入
? DataFrame APIs とは?
? DataFrame APIs の紹介
? Demo
? まとめ
10
Agenda
? 導入
? DataFrame APIs とは?
? DataFrame APIs の紹介
? Demo
? まとめ
11
DataFrame APIs とは
? データサイエンスでよく使われる処理を Domain-specific functions
にしたもの
– Project
– Filter
– Aggregation
– Join
– UDFs
? Python, Java, Scala and R (via SparkR) から利用できる
? Spark 1.3 でリリースされる機能
12
ビッグデータを Spark 上で
より簡潔でより高速に処理できる機能
Spark ならより簡単に MR を実装できる
13
DataFrame APIs で Spark をより簡潔に操作
14
DataFrame の最適化機構によってより高速に処理ができる
15
Agenda
? 導入
? DataFrame APIs とは?
? DataFrame APIs の紹介
? Demo
? まとめ
16
主な DataFrame APIs
? Creation
? Check Schema
? Project
? Filter
? Aggregation
? Join
? UDFs
17
Creation
? DataFrame API で扱うデータの読み込み
? JSON, Hive, Purque などが利用できる
18
// Create a SQLContext (sc is an existing SparkContext)
val context = new org.apache.spark.sql.SQLContext(sc)
// Create a DataFrame for Github events
var path = "file:///tmp/github-archive-data/*.json.gz"
val event = context.load(path, "json").as('event)
// Create a DataFrame for Github users
path = "file:///tmp/github-archive-data/github-users.json"
val user = context.load(path, "json").as('user)
Check Schema
? printSchema でスキーマを確認できる
19
event.printSchema
root
|-- actor: struct (nullable = true)
| |-- avatar_url: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- id: long (nullable = true)
| |-- login: string (nullable = true)
| |-- url: string (nullable = true)
|-- created_at: string (nullable = true)
|-- id: string (nullable = true)
….
Project
? select() で取り出したいカラムを選択
– $”parent.child” の記法でネストされたカラムも選択できる
? select(‘key as ‘alias) のように as でエイリアスを作れる
20
// Select a column
event("public”)
event.select(‘public as ‘PUBLIC)
// Select multile columns with aliases
event.select('public as 'PUBLIC, 'id as 'ID)
// Select nested columns with aliases
event.select($"payload.size" as 'size, $"actor.id" as 'actor_id)
Filter
? filter() は SQL の WHERE のような役割
? 複数の条件を指定する場合は,ひとつの filter() に入れるこ
とも,ふたつの filter() のチェーンに分けることもできる
21
// Filter by a condition
user.filter("name is not null”)
// Filter by a comblination of two conditions
event.filter("public = true and type = 'ForkEvent'”)
event.filter("public = true").filter("type = 'ForkEvent'”)
Aggregation
? count() は単純にレコード数を数える
? groupBy() は SQL の GROUP BY の働き
? agg() を組み合わせることで威力をさらに発揮
22
// Count the number of records
user.count
// Group by ‘type column and then count
event.groupBy("type").count()
// Aggregate by ‘id column
user.groupBy('id).agg('id, count("*"), sum('id))
Join
? まず as() で各 DataFrame のエイリアスを登録
? join() と where() で結合と結合条件を指定
? 結合した結果のカラムを取り出すには,登録したエ
イリアスを利用すれば良い
23
// Register their aliases
val user = user.as('user)
val pr = event.filter('type === "PullRequestEvent").as('pr)
// Join the two data sets
val join = pr.join(user).where($"pr.payload.pull_request.user.id" === $"user.id")
join.select($"pr.type", $"user.name", $"pr.created_at”)
UDF: User Defined Function
? udf() で独自関数を定義して,DataFrame の中で利用できる
? 例:文字列 “2015-01-01T00:00:00Z” から
– “2015-01-01” を抽出する関数を定義
– “00:00:00” を抽出する関数を定義
24
// Define User Defined Functions
val toDate = udf((createdAt: String) => createdAt.substring(0, 10))
val toTime = udf((createdAt: String) => createdAt.substring(11, 19))
// Use the UDFs in select()
event.select(toDate('created_at) as 'date, toTime('created_at) as 'time)
Agenda
? 導入
? DataFrame APIs とは?
? DataFrame APIs の紹介
? Demo
? まとめ
25
With Machine Learning (1)
? Github のコミットメッセージに対して,word2vec を適応
26
val toIter = udf((x: ArrayBuffer[String]) => x.mkString(delimiter))
val messages = event.
select($"payload.commits.message" as 'messages).filter("messages is not null").
select(toIter($"messages")).
flatMap(row => row.toSeq.map(_.toString).apply(0).split(delimiter))
val message = messages.map(_.replaceAll("""(n|)""", "").
replaceAll("""s+""", " ").split(" ").map(_.replaceAll("""(,|.)$""", "")).toSeq).
filter(_.size > 0)
// create a model
val model = new Word2Vec().fit(message)
With Machine Learning (2)
? 構築したモデルで類似語を出力
27
> model.findSynonyms("bug", 10).foreach(println)
(issue,0.6874246597290039)
(typo,0.663004457950592)
(bugs,0.599325954914093)
(errors,0.5887047052383423)
(problem,0.5665265321731567)
(fixes,0.5617778897285461)
(spelling,0.5353047847747803)
(crash,0.5330312848091125)
(Fixed,0.5128884315490723)
(small,0.5113803744316101)
Agenda
? 導入
? DataFrame APIs とは?
? DataFrame APIs の紹介
? Demo
? まとめ
28
まとめ
? DataFrame APIs は,ビッグデータを Spark 上
でより簡潔より高速に処理できる機能
? groupBy, agg, count, join などのデータ操作で
よく使う関数が準備されている
– Pandas を知っている人は,あんな感じをイメージ
してくれるとよい
? UDF で独自の関数も定義できる
? 機械学習ライブラリとの組み合わせられる
29
DataFrame の課題
? Apache Spark の機械学習ライブラリ mllib に
つなぎこむためのデータ変換が面倒くさい
? よりシームレスに連携できるような仕組みが
必要だと思われる
30
DataFrame Introduction
? spark-dataframe-introduction
? http://goo.gl/Futoi0
31

More Related Content

2015 03-12 道玄坂LT祭り第2回 Spark DataFrame Introduction

  • 2. 発表のゴール ? Hadoop MapReduce を書くのめんどいわー ? Python ユーザや R ユーザが,ビッグデータを 気軽に扱うにはどうすりゃいいの? 2 それ Spark DataFrame でできるよ
  • 5. Ease of Use 5 PageRank も数行で実装できる
  • 7. Runs Everywhere 7 Hadoop, Mesos などと豊富な连携
  • 8. 2015 年の重点開発方針 ? Data Science – 使いやすい high-level APIs の提供 ? scikit-learn のような一貫性をもった APIs ? Platform Interfaces – 標準対応以外のデータソースへのアクセスやアルゴリズ ムをより簡単に利用できるインターフェースの提供 ? spark-packages というパッケージ管理ツールのようなもので 3rd party ライブラリを利用できるようにする 8
  • 9. DataFrame APIs の紹介 「Spark なにそれ?」って人も多いと 思うので表面的な話をします 9
  • 10. Agenda ? 導入 ? DataFrame APIs とは? ? DataFrame APIs の紹介 ? Demo ? まとめ 10
  • 11. Agenda ? 導入 ? DataFrame APIs とは? ? DataFrame APIs の紹介 ? Demo ? まとめ 11
  • 12. DataFrame APIs とは ? データサイエンスでよく使われる処理を Domain-specific functions にしたもの – Project – Filter – Aggregation – Join – UDFs ? Python, Java, Scala and R (via SparkR) から利用できる ? Spark 1.3 でリリースされる機能 12 ビッグデータを Spark 上で より簡潔でより高速に処理できる機能
  • 13. Spark ならより簡単に MR を実装できる 13
  • 14. DataFrame APIs で Spark をより簡潔に操作 14
  • 16. Agenda ? 導入 ? DataFrame APIs とは? ? DataFrame APIs の紹介 ? Demo ? まとめ 16
  • 17. 主な DataFrame APIs ? Creation ? Check Schema ? Project ? Filter ? Aggregation ? Join ? UDFs 17
  • 18. Creation ? DataFrame API で扱うデータの読み込み ? JSON, Hive, Purque などが利用できる 18 // Create a SQLContext (sc is an existing SparkContext) val context = new org.apache.spark.sql.SQLContext(sc) // Create a DataFrame for Github events var path = "file:///tmp/github-archive-data/*.json.gz" val event = context.load(path, "json").as('event) // Create a DataFrame for Github users path = "file:///tmp/github-archive-data/github-users.json" val user = context.load(path, "json").as('user)
  • 19. Check Schema ? printSchema でスキーマを確認できる 19 event.printSchema root |-- actor: struct (nullable = true) | |-- avatar_url: string (nullable = true) | |-- gravatar_id: string (nullable = true) | |-- id: long (nullable = true) | |-- login: string (nullable = true) | |-- url: string (nullable = true) |-- created_at: string (nullable = true) |-- id: string (nullable = true) ….
  • 20. Project ? select() で取り出したいカラムを選択 – $”parent.child” の記法でネストされたカラムも選択できる ? select(‘key as ‘alias) のように as でエイリアスを作れる 20 // Select a column event("public”) event.select(‘public as ‘PUBLIC) // Select multile columns with aliases event.select('public as 'PUBLIC, 'id as 'ID) // Select nested columns with aliases event.select($"payload.size" as 'size, $"actor.id" as 'actor_id)
  • 21. Filter ? filter() は SQL の WHERE のような役割 ? 複数の条件を指定する場合は,ひとつの filter() に入れるこ とも,ふたつの filter() のチェーンに分けることもできる 21 // Filter by a condition user.filter("name is not null”) // Filter by a comblination of two conditions event.filter("public = true and type = 'ForkEvent'”) event.filter("public = true").filter("type = 'ForkEvent'”)
  • 22. Aggregation ? count() は単純にレコード数を数える ? groupBy() は SQL の GROUP BY の働き ? agg() を組み合わせることで威力をさらに発揮 22 // Count the number of records user.count // Group by ‘type column and then count event.groupBy("type").count() // Aggregate by ‘id column user.groupBy('id).agg('id, count("*"), sum('id))
  • 23. Join ? まず as() で各 DataFrame のエイリアスを登録 ? join() と where() で結合と結合条件を指定 ? 結合した結果のカラムを取り出すには,登録したエ イリアスを利用すれば良い 23 // Register their aliases val user = user.as('user) val pr = event.filter('type === "PullRequestEvent").as('pr) // Join the two data sets val join = pr.join(user).where($"pr.payload.pull_request.user.id" === $"user.id") join.select($"pr.type", $"user.name", $"pr.created_at”)
  • 24. UDF: User Defined Function ? udf() で独自関数を定義して,DataFrame の中で利用できる ? 例:文字列 “2015-01-01T00:00:00Z” から – “2015-01-01” を抽出する関数を定義 – “00:00:00” を抽出する関数を定義 24 // Define User Defined Functions val toDate = udf((createdAt: String) => createdAt.substring(0, 10)) val toTime = udf((createdAt: String) => createdAt.substring(11, 19)) // Use the UDFs in select() event.select(toDate('created_at) as 'date, toTime('created_at) as 'time)
  • 25. Agenda ? 導入 ? DataFrame APIs とは? ? DataFrame APIs の紹介 ? Demo ? まとめ 25
  • 26. With Machine Learning (1) ? Github のコミットメッセージに対して,word2vec を適応 26 val toIter = udf((x: ArrayBuffer[String]) => x.mkString(delimiter)) val messages = event. select($"payload.commits.message" as 'messages).filter("messages is not null"). select(toIter($"messages")). flatMap(row => row.toSeq.map(_.toString).apply(0).split(delimiter)) val message = messages.map(_.replaceAll("""(n|)""", ""). replaceAll("""s+""", " ").split(" ").map(_.replaceAll("""(,|.)$""", "")).toSeq). filter(_.size > 0) // create a model val model = new Word2Vec().fit(message)
  • 27. With Machine Learning (2) ? 構築したモデルで類似語を出力 27 > model.findSynonyms("bug", 10).foreach(println) (issue,0.6874246597290039) (typo,0.663004457950592) (bugs,0.599325954914093) (errors,0.5887047052383423) (problem,0.5665265321731567) (fixes,0.5617778897285461) (spelling,0.5353047847747803) (crash,0.5330312848091125) (Fixed,0.5128884315490723) (small,0.5113803744316101)
  • 28. Agenda ? 導入 ? DataFrame APIs とは? ? DataFrame APIs の紹介 ? Demo ? まとめ 28
  • 29. まとめ ? DataFrame APIs は,ビッグデータを Spark 上 でより簡潔より高速に処理できる機能 ? groupBy, agg, count, join などのデータ操作で よく使う関数が準備されている – Pandas を知っている人は,あんな感じをイメージ してくれるとよい ? UDF で独自の関数も定義できる ? 機械学習ライブラリとの組み合わせられる 29
  • 30. DataFrame の課題 ? Apache Spark の機械学習ライブラリ mllib に つなぎこむためのデータ変換が面倒くさい ? よりシームレスに連携できるような仕組みが 必要だと思われる 30