18. Creation
? DataFrame API で扱うデータの読み込み
? JSON, Hive, Purque などが利用できる
18
// Create a SQLContext (sc is an existing SparkContext)
val context = new org.apache.spark.sql.SQLContext(sc)
// Create a DataFrame for Github events
var path = "file:///tmp/github-archive-data/*.json.gz"
val event = context.load(path, "json").as('event)
// Create a DataFrame for Github users
path = "file:///tmp/github-archive-data/github-users.json"
val user = context.load(path, "json").as('user)
20. Project
? select() で取り出したいカラムを選択
– $”parent.child” の記法でネストされたカラムも選択できる
? select(‘key as ‘alias) のように as でエイリアスを作れる
20
// Select a column
event("public”)
event.select(‘public as ‘PUBLIC)
// Select multile columns with aliases
event.select('public as 'PUBLIC, 'id as 'ID)
// Select nested columns with aliases
event.select($"payload.size" as 'size, $"actor.id" as 'actor_id)
21. Filter
? filter() は SQL の WHERE のような役割
? 複数の条件を指定する場合は,ひとつの filter() に入れるこ
とも,ふたつの filter() のチェーンに分けることもできる
21
// Filter by a condition
user.filter("name is not null”)
// Filter by a comblination of two conditions
event.filter("public = true and type = 'ForkEvent'”)
event.filter("public = true").filter("type = 'ForkEvent'”)
22. Aggregation
? count() は単純にレコード数を数える
? groupBy() は SQL の GROUP BY の働き
? agg() を組み合わせることで威力をさらに発揮
22
// Count the number of records
user.count
// Group by ‘type column and then count
event.groupBy("type").count()
// Aggregate by ‘id column
user.groupBy('id).agg('id, count("*"), sum('id))
23. Join
? まず as() で各 DataFrame のエイリアスを登録
? join() と where() で結合と結合条件を指定
? 結合した結果のカラムを取り出すには,登録したエ
イリアスを利用すれば良い
23
// Register their aliases
val user = user.as('user)
val pr = event.filter('type === "PullRequestEvent").as('pr)
// Join the two data sets
val join = pr.join(user).where($"pr.payload.pull_request.user.id" === $"user.id")
join.select($"pr.type", $"user.name", $"pr.created_at”)
24. UDF: User Defined Function
? udf() で独自関数を定義して,DataFrame の中で利用できる
? 例:文字列 “2015-01-01T00:00:00Z” から
– “2015-01-01” を抽出する関数を定義
– “00:00:00” を抽出する関数を定義
24
// Define User Defined Functions
val toDate = udf((createdAt: String) => createdAt.substring(0, 10))
val toTime = udf((createdAt: String) => createdAt.substring(11, 19))
// Use the UDFs in select()
event.select(toDate('created_at) as 'date, toTime('created_at) as 'time)
26. With Machine Learning (1)
? Github のコミットメッセージに対して,word2vec を適応
26
val toIter = udf((x: ArrayBuffer[String]) => x.mkString(delimiter))
val messages = event.
select($"payload.commits.message" as 'messages).filter("messages is not null").
select(toIter($"messages")).
flatMap(row => row.toSeq.map(_.toString).apply(0).split(delimiter))
val message = messages.map(_.replaceAll("""(n|)""", "").
replaceAll("""s+""", " ").split(" ").map(_.replaceAll("""(,|.)$""", "")).toSeq).
filter(_.size > 0)
// create a model
val model = new Word2Vec().fit(message)