2. 徭失B初
? Kimura, Sotaro(@kimutansk)
C なんでもやデ`タエンジニア @ ドワンゴ
? デ`タ蛍裂児Pの砿尖
? デ`タにvBするj喘Sことなら寄悶なんでも
C 挫きな室g蛍勸
? ストリ`ムI尖麼にJVM貧
? 蛍柊システム
C 寄俳だと房うKafkaのO協朕
? advertised.listeners
? cleanup.policy
? unclean.leader.election.enable
3. Spark Structured Streamingとは
? Spark SQL貧でストリ`ムI尖アプリケ`ションを
gにMむためのコンポ`ネント
C Spark2.1狼のr泣でα井
C だが、databricksブログでシリ`ズ誘後吉其竃は謹い
C バッチI尖と揖の圭隈でストリ`ムI尖を峰辛嬬
C Scala/Java/PythonのDataset/DataFrame APIで峰
C Dataset/DataFrameを喘いることで
夛晒デ`タとして恷m晒された彜Bで嘛
? メモリ聞喘楚のs
? ベクトル處麻によるCPUリソ`スの嗤浸醵
4. Spark Structured Streamingとは
? モダンなストリ`ムI尖の勣殆をレくカバ`
C Out of orderなデ`タに
? Event Timeベ`スでI尖を佩うことが辛嬬
? Watermarkにより、S否するW决もO協辛嬬
C Windowv方で匯協rgごとにデ`タを曝俳ってI尖辛嬬
? 1蛍gの曝俳りで5蛍g蛍のデ`タをI尖などの
Sliding Windowも旋喘辛嬬
C Accumulation ModeOutput/Update Modeも峺協辛嬬
? ただし、I尖坪否でm喘辛嬬なモ`ドは泙蕕譴
C ストリ`ムI尖vB喘Zは參和のY創歌孚
? http://niconare.nicovideo.jp/watch/kn2358
10. gHのアプリケ`ション箭
? gHにMんでみて、gにできたものは
C ストリ`ミングETL
? KafkaのTopic兆各をデ`タのスキ`マに鬉気擦討けば、
Topicのパタ`ン峺協で1アプリケ`ションで匯凄I尖辛嬬
C 箭災塹造TopicをリアルタイムでQしてHDFSに誘秘
C example_distributor_action
C example_audience_action
C example_general_action
? Topic阿乏薦ディレクトリを蛍けたい
? 晩、rgg了で竃薦ディレクトリを蛍けたい
? 1蛍おきにg佩したい
11. gHのアプリケ`ション箭
// Sparkアプリケ`ション伏撹
val sparkSession = SparkSession
.builder
.appName("StreamingETLExample")
.getOrCreate()
// example_で兵まるKafka Topicをiみzむ
val kafkaDs = sparkSession.readStream.format("kafka").
option(kafka.bootstrap.servers", "host1:port1,host2:port2,host3:port3").
option("subscribePattern", "example_.*").load()
// デ`タのQv方?r震の渇竃v方をUDFとしてO協
val exampleConvertUdf = udf(funcExampleConvert)
val extractTimestampUdf = udf(funcExtractTimestamp)
12. gHのアプリケ`ション箭
// デ`タをQし、グル`プ蛍けに駅勣 or 竃薦カラムにgる
import sparkSession.implicits._
val convertedDs =
kafkaDs.selectExpr("topic", "CAST(value AS STRING) as value").
withColumn("converted_value", exampleConvertUdf('value)).
withColumn("data_timestamp_str", extractTimestampUdf('value)).
withColumn("data_timestamp",
unix_timestamp($"data_timestamp_str", "yyyy-MM-dd'T'HH:mm:ssXXX")).
withColumn("date", from_unixtime($"data_timestamp", "yyyyMMdd")).
withColumn("hour", from_unixtime($"data_timestamp", "HH")).
selectExpr("topic", "date", "hour", "converted_value")