際際滷

際際滷Share a Scribd company logo
Spark Structured Streamingで
Kafkaクラスタのデ`タをお返X試喘
2017/07/06
Apache Kafka Meetup Japan #3
Kimura, Sotaro@kimutansk
https://www.flickr.com/photos/savannahcorps/9256762362
徭失B初
? Kimura, Sotaro(@kimutansk)
C なんでもやデ`タエンジニア @ ドワンゴ
? デ`タ蛍裂児Pの砿尖
? デ`タにvBするj喘Sことなら寄悶なんでも
C 挫きな室g蛍勸
? ストリ`ムI尖麼にJVM貧
? 蛍柊システム
C 寄俳だと房うKafkaのO協朕
? advertised.listeners
? cleanup.policy
? unclean.leader.election.enable
Spark Structured Streamingとは
? Spark SQL貧でストリ`ムI尖アプリケ`ションを
gにMむためのコンポ`ネント
C Spark2.1狼のr泣でα井
C だが、databricksブログでシリ`ズ誘後吉其竃は謹い
C バッチI尖と揖の圭隈でストリ`ムI尖を峰辛嬬
C Scala/Java/PythonのDataset/DataFrame APIで峰
C Dataset/DataFrameを喘いることで
夛晒デ`タとして恷m晒された彜Bで嘛
? メモリ聞喘楚のs
? ベクトル處麻によるCPUリソ`スの嗤浸醵
Spark Structured Streamingとは
? モダンなストリ`ムI尖の勣殆をレくカバ`
C Out of orderなデ`タに
? Event Timeベ`スでI尖を佩うことが辛嬬
? Watermarkにより、S否するW决もO協辛嬬
C Windowv方で匯協rgごとにデ`タを曝俳ってI尖辛嬬
? 1蛍gの曝俳りで5蛍g蛍のデ`タをI尖などの
Sliding Windowも旋喘辛嬬
C Accumulation ModeOutput/Update Modeも峺協辛嬬
? ただし、I尖坪否でm喘辛嬬なモ`ドは泙蕕譴
C ストリ`ムI尖vB喘Zは參和のY創歌孚
? http://niconare.nicovideo.jp/watch/kn2358
gなアプリケ`ション箭
// Sparkアプリケ`ション伏撹
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
// ロ`カルポ`ト貧にソケットを伏撹してデ`タを棋ち鞭け
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
gなアプリケ`ション箭
// 秘薦デ`タをgZ阿坊峺
val words = lines.as[String].flatMap(_.split(" "))
// 秘薦gZ阿縫ウント
val wordCounts = words.groupBy("value").count()
// 鹿Y惚を飴悗垢戮謄灰鵐秋`ルに竃薦
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
// アプリケ`ションが翌何から唯峭されるまでg佩
query.awaitTermination()
gなアプリケ`ションイメ`ジ
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
どうKafkaとB亊できるのか
? spark-sql-kafkaを聞喘
C 參和の峺協でKafkaのデ`タを函誼辛嬬
? Topic + Partition
? Topic
? Topicのパタ`ン屎ア蹶Fで峺協辛嬬
C デ`タのメタ秤鵑睛rに函誼するため旋喘辛嬬
key: binary (nullable = true)
value: binary (nullable = true)
topic: string (nullable = true)
partition: integer (nullable = true)
offset: long (nullable = true)
timestamp: timestamp (nullable = true)
timestampType: integer (nullable = true)
どうKafkaとB亊できるのか
? 聞喘貧の廣吭泣
C offsetはSpark箸WALとして砿尖されるため、
麿アプリケ`ションとConsumerGroupを喘いたB亊は音辛
? gH駅勣になることはほぼないとは房いますが
C 寄楚Topicをiむとマイクロバッチg佩rgはLくなる
? KafkaのTopic-PartitionがSparkの1Partitionに鬉垢襪燭
? 輝たり念ではありますが
C まだα井なのでAPIやその麿が笋錣訖苗榻圓△
? 繁議にはWALがSparkのバ`ジョンアップで
iめなくならないか、が掲械に伽い???
gHのアプリケ`ション箭
? gHにMんでみて、gにできたものは
C ストリ`ミングETL
? KafkaのTopic兆各をデ`タのスキ`マに鬉気擦討けば、
Topicのパタ`ン峺協で1アプリケ`ションで匯凄I尖辛嬬
C 箭災塹造TopicをリアルタイムでQしてHDFSに誘秘
C example_distributor_action
C example_audience_action
C example_general_action
? Topic阿乏薦ディレクトリを蛍けたい
? 晩、rgg了で竃薦ディレクトリを蛍けたい
? 1蛍おきにg佩したい
gHのアプリケ`ション箭
// Sparkアプリケ`ション伏撹
val sparkSession = SparkSession
.builder
.appName("StreamingETLExample")
.getOrCreate()
// example_で兵まるKafka Topicをiみzむ
val kafkaDs = sparkSession.readStream.format("kafka").
option(kafka.bootstrap.servers", "host1:port1,host2:port2,host3:port3").
option("subscribePattern", "example_.*").load()
// デ`タのQv方?r震の渇竃v方をUDFとしてO協
val exampleConvertUdf = udf(funcExampleConvert)
val extractTimestampUdf = udf(funcExtractTimestamp)
gHのアプリケ`ション箭
// デ`タをQし、グル`プ蛍けに駅勣 or 竃薦カラムにgる
import sparkSession.implicits._
val convertedDs =
kafkaDs.selectExpr("topic", "CAST(value AS STRING) as value").
withColumn("converted_value", exampleConvertUdf('value)).
withColumn("data_timestamp_str", extractTimestampUdf('value)).
withColumn("data_timestamp",
unix_timestamp($"data_timestamp_str", "yyyy-MM-dd'T'HH:mm:ssXXX")).
withColumn("date", from_unixtime($"data_timestamp", "yyyyMMdd")).
withColumn("hour", from_unixtime($"data_timestamp", "HH")).
selectExpr("topic", "date", "hour", "converted_value")
gHのアプリケ`ション箭
// デ`タQY惚をHDFSに竃薦
// 竃薦ディレクトリパス箭は參和
// /data/converted/topic=example_general_action/date=20170706/hour=23
// ☆gHは竃薦g侯肝及でファイル方が湯れ貧がるので廣吭
convertedDs.toDF().writeStream.
trigger(ProcessingTime("60 seconds")).
partitionBy("topic", "date", "hour").
outputMode("append").
option("compression", "snappy").
option("checkpointLocation", "/data/checkpoint").
format("parquet").
start("/data/converted").awaitTermination()
まとめ
? Spark Streaming?KafkaB亊は
Structured Streamingで曳^議gにできる
? }方のTopicをまとめてiみzめるため、
ストリ`ミングETLはユ`スケ`スとして嗤李
? ただしα井なので、
APIや、WALの侘塀の筝には廣吭
Thank you for your attention!
https://www.flickr.com/photos/savannahcorps/7409364642

More Related Content

Spark Structured StreamingでKafkaクラスタのデ`タをお返X試喘

  • 1. Spark Structured Streamingで Kafkaクラスタのデ`タをお返X試喘 2017/07/06 Apache Kafka Meetup Japan #3 Kimura, Sotaro@kimutansk https://www.flickr.com/photos/savannahcorps/9256762362
  • 2. 徭失B初 ? Kimura, Sotaro(@kimutansk) C なんでもやデ`タエンジニア @ ドワンゴ ? デ`タ蛍裂児Pの砿尖 ? デ`タにvBするj喘Sことなら寄悶なんでも C 挫きな室g蛍勸 ? ストリ`ムI尖麼にJVM貧 ? 蛍柊システム C 寄俳だと房うKafkaのO協朕 ? advertised.listeners ? cleanup.policy ? unclean.leader.election.enable
  • 3. Spark Structured Streamingとは ? Spark SQL貧でストリ`ムI尖アプリケ`ションを gにMむためのコンポ`ネント C Spark2.1狼のr泣でα井 C だが、databricksブログでシリ`ズ誘後吉其竃は謹い C バッチI尖と揖の圭隈でストリ`ムI尖を峰辛嬬 C Scala/Java/PythonのDataset/DataFrame APIで峰 C Dataset/DataFrameを喘いることで 夛晒デ`タとして恷m晒された彜Bで嘛 ? メモリ聞喘楚のs ? ベクトル處麻によるCPUリソ`スの嗤浸醵
  • 4. Spark Structured Streamingとは ? モダンなストリ`ムI尖の勣殆をレくカバ` C Out of orderなデ`タに ? Event Timeベ`スでI尖を佩うことが辛嬬 ? Watermarkにより、S否するW决もO協辛嬬 C Windowv方で匯協rgごとにデ`タを曝俳ってI尖辛嬬 ? 1蛍gの曝俳りで5蛍g蛍のデ`タをI尖などの Sliding Windowも旋喘辛嬬 C Accumulation ModeOutput/Update Modeも峺協辛嬬 ? ただし、I尖坪否でm喘辛嬬なモ`ドは泙蕕譴 C ストリ`ムI尖vB喘Zは參和のY創歌孚 ? http://niconare.nicovideo.jp/watch/kn2358
  • 5. gなアプリケ`ション箭 // Sparkアプリケ`ション伏撹 val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate() import spark.implicits._ // ロ`カルポ`ト貧にソケットを伏撹してデ`タを棋ち鞭け val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
  • 6. gなアプリケ`ション箭 // 秘薦デ`タをgZ阿坊峺 val words = lines.as[String].flatMap(_.split(" ")) // 秘薦gZ阿縫ウント val wordCounts = words.groupBy("value").count() // 鹿Y惚を飴悗垢戮謄灰鵐秋`ルに竃薦 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() // アプリケ`ションが翌何から唯峭されるまでg佩 query.awaitTermination()
  • 8. どうKafkaとB亊できるのか ? spark-sql-kafkaを聞喘 C 參和の峺協でKafkaのデ`タを函誼辛嬬 ? Topic + Partition ? Topic ? Topicのパタ`ン屎ア蹶Fで峺協辛嬬 C デ`タのメタ秤鵑睛rに函誼するため旋喘辛嬬 key: binary (nullable = true) value: binary (nullable = true) topic: string (nullable = true) partition: integer (nullable = true) offset: long (nullable = true) timestamp: timestamp (nullable = true) timestampType: integer (nullable = true)
  • 9. どうKafkaとB亊できるのか ? 聞喘貧の廣吭泣 C offsetはSpark箸WALとして砿尖されるため、 麿アプリケ`ションとConsumerGroupを喘いたB亊は音辛 ? gH駅勣になることはほぼないとは房いますが C 寄楚Topicをiむとマイクロバッチg佩rgはLくなる ? KafkaのTopic-PartitionがSparkの1Partitionに鬉垢襪燭 ? 輝たり念ではありますが C まだα井なのでAPIやその麿が笋錣訖苗榻圓△ ? 繁議にはWALがSparkのバ`ジョンアップで iめなくならないか、が掲械に伽い???
  • 10. gHのアプリケ`ション箭 ? gHにMんでみて、gにできたものは C ストリ`ミングETL ? KafkaのTopic兆各をデ`タのスキ`マに鬉気擦討けば、 Topicのパタ`ン峺協で1アプリケ`ションで匯凄I尖辛嬬 C 箭災塹造TopicをリアルタイムでQしてHDFSに誘秘 C example_distributor_action C example_audience_action C example_general_action ? Topic阿乏薦ディレクトリを蛍けたい ? 晩、rgg了で竃薦ディレクトリを蛍けたい ? 1蛍おきにg佩したい
  • 11. gHのアプリケ`ション箭 // Sparkアプリケ`ション伏撹 val sparkSession = SparkSession .builder .appName("StreamingETLExample") .getOrCreate() // example_で兵まるKafka Topicをiみzむ val kafkaDs = sparkSession.readStream.format("kafka"). option(kafka.bootstrap.servers", "host1:port1,host2:port2,host3:port3"). option("subscribePattern", "example_.*").load() // デ`タのQv方?r震の渇竃v方をUDFとしてO協 val exampleConvertUdf = udf(funcExampleConvert) val extractTimestampUdf = udf(funcExtractTimestamp)
  • 12. gHのアプリケ`ション箭 // デ`タをQし、グル`プ蛍けに駅勣 or 竃薦カラムにgる import sparkSession.implicits._ val convertedDs = kafkaDs.selectExpr("topic", "CAST(value AS STRING) as value"). withColumn("converted_value", exampleConvertUdf('value)). withColumn("data_timestamp_str", extractTimestampUdf('value)). withColumn("data_timestamp", unix_timestamp($"data_timestamp_str", "yyyy-MM-dd'T'HH:mm:ssXXX")). withColumn("date", from_unixtime($"data_timestamp", "yyyyMMdd")). withColumn("hour", from_unixtime($"data_timestamp", "HH")). selectExpr("topic", "date", "hour", "converted_value")
  • 13. gHのアプリケ`ション箭 // デ`タQY惚をHDFSに竃薦 // 竃薦ディレクトリパス箭は參和 // /data/converted/topic=example_general_action/date=20170706/hour=23 // ☆gHは竃薦g侯肝及でファイル方が湯れ貧がるので廣吭 convertedDs.toDF().writeStream. trigger(ProcessingTime("60 seconds")). partitionBy("topic", "date", "hour"). outputMode("append"). option("compression", "snappy"). option("checkpointLocation", "/data/checkpoint"). format("parquet"). start("/data/converted").awaitTermination()
  • 14. まとめ ? Spark Streaming?KafkaB亊は Structured Streamingで曳^議gにできる ? }方のTopicをまとめてiみzめるため、 ストリ`ミングETLはユ`スケ`スとして嗤李 ? ただしα井なので、 APIや、WALの侘塀の筝には廣吭
  • 15. Thank you for your attention! https://www.flickr.com/photos/savannahcorps/7409364642