際際滷

際際滷Share a Scribd company logo
Spark Streaming Snippets
@a#y303
書まで恬った Spark Streaming アプリ
? rtg -- リアルタイムリタゲ喘デ`タ伏撹
? pixelwriter -- Dynamic Crea4ve 喘マ`クデ`タきzみ
? feedsync -- 斌瞳フィ`ド揖豚
? segment:elas4c -- リアルタイムセグメント晒
? (logblend -- なるイベントログの Join)
これらのアプリからm輝に慌嗤すると耙しそうなとこ
ろをiいてみた
? SparkBoot
? Cron
? UpdatableBroadcast
? Connector
? SparkStreamingSpec
SparkBoot
h"ps://gist.github.com/a"y303/c83f3c8cb8a930951be0
? Spark アプリの main g廾
? SparkContext / StreamingContext を戻工する
? Con?gura4on 砿尖
? spark-submit の --files で applica4on.conf を僕ってカスタ
マイズ
バッチの栽
object TrainingBatchApp extends SparkBatchBoot {
val appName = "TrainingBatchApp"
override def mkApp(sc: SparkContext, args: Array[String]): SparkApp =
new TrainingBatchApp(sc, appConfig)
}
class TrainingBatchApp(
sc: SparkContext, appConfig: Config)
extends SparkApp {
def run(): Try[Int] = Try {
0
}
}
ストリ`ミングの栽
object PredictStreamingApp extends SparkStreamingBoot {
val appName = "PredictStreamingApp"
override val checkpointPath: String = "app.training.streaming.checkpoint-path"
override val batchDurationPath: String = "app.training.streaming.batch-duration"
override def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp =
new PredictStreamingApp(ssc, batchDuration, appConfig)
}
class PredictStreamingApp(
ssc: StreamingContext, appConfig: Config)
extends SparkApp {
val sparkContext = ssc.sparkContext
def run(): Try[Int] = Try {
0
}
}
Cron 議なことをやる
? batch-dura+on よりLいg侯で協豚g佩したいI尖がある
? 翌何デ`タストアからiんでいるマスタデ`タのリフレッシュ
など
def repeatedly(streamingContext: StreamingContext, interval: Duration)
(f: (SparkContext, Time) => Unit): Unit = {
// トリガ`を伏撹する DStream
val s = streamingContext.queueStream(
mutable.Queue.empty[RDD[Unit]],
oneAtATime = true,
defaultRDD = streamingContext.sparkContext.makeRDD(Seq(())))
.repartition(1)
s.window(s.slideDuration, interval)
.foreachRDD { (rdd, time) =>
f(rdd.context, time)
rdd.foreach(_ => ())
}
}
聞い圭
repetedly(streamingContext, Durations.seconds(300)) { (sc, time) =>
// @driver: 5 蛍阿g佩するI尖
}
厚仟辛嬬な Broadcast
? Streaming が咾兵めた瘁に Broadcast を厚仟したい
? gなる Broadcast を隠隔するラッパ`
/**
* 、慮仟(壅ブロ`ドキャスト)が辛嬬な Broadcast
*
* https://gist.github.com/Reinvigorate/040a362ca8100347e1a6
* @author Reinvigorate
*/
case class UpdatableBroadcast[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
聞い圭
def loadModel(): Model = ???
val ub: UpdatableBroadcast[Model] =
UpdatableBroadcast(streamingContext, loadModel())
StreamingUtil.repeatedly(streamingContext, refreshInterval) { (_, _) =>
ub.update(loadModel())
}
dstream.foreachRDD { rdd =>
val model = ub.value
// use model
}
翌何俊Aの渇鷸
? Spark で翌何リソ`スにアクセスするとき、?の Executor が
俊AをS隔する駅勣がある
? Driver から Executor に仝俊Aそのもの々を僕佚することはでき
ない
? 仝俊Aする圭隈々を Connector trait として渇鷸している
trait Connector[A] extends java.io.Closeable with Serializable {
def get: A
def close(): Unit
def using[B](f: A => B): B = f(get)
}
case class PoolAerospikeConnector(name: Symbol, config: AerospikeConfig)
extends Connector[AerospikeClient] {
def get: AerospikeClient =
PoolAerospikeConnector.defaultHolder.getOrCreate(name, mkClient)
def close(): Unit =
PoolAerospikeConnector.defaultHolder.remove(name)(
AerospikeConnector.aerospikeClientClosable)
private val mkClient: () => AerospikeClient =
() => new AerospikeClient(config.clientPolicy.underlying,
config.asHosts:_*)
}
object PoolAerospikeConnector {
private val defaultHolder = new DefaultResourceHolder[AerospikeClient]
}
case class ScalikeJdbcConnector(name: Symbol, config: Config)
extends Connector[Unit] {
def get: Unit = {
if (!ConnectionPool.isInitialized(name)) {
// Load MySQL JDBC driver class
Class.forName("com.mysql.jdbc.Driver")
ConnectionPool.add(name, config.getString("url"),
config.getString("user"), config.getString("password"))
}
}
def close(): Unit = ConnectionPool.close(name)
}
case class KafkaProducerConnector[K :ClassTag, V :ClassTag](
name: Symbol, config: java.util.Map[String, AnyRef])
extends Connector[ScalaKafkaProducer[K, V]] {
def get: ScalaKafkaProducer[K, V] =
KafkaProducerConnector.defaultHolder.getOrCreate(name, mkResource)
.asInstanceOf[ScalaKafkaProducer[K, V]]
def close(): Unit = KafkaProducerConnector.defaultHolder.remove(name)(
KafkaProducerConnector.kafkaProducerClosable)
private val mkResource = () => {
val keySer = mkDefaultSerializer[K](ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
val valueSer = mkDefaultSerializer[V](ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
new ScalaKafkaProducer[K, V](
new KafkaProducer[K, V](config, keySer.orNull, valueSer.orNull))
}
private def mkDefaultSerializer[A :ClassTag](configKey: String): Option[Serializer[A]] = {
if (!config.containsKey(configKey)) {
implicitly[ClassTag[A]].runtimeClass match {
case c if c == classOf[Array[Byte]] => Some(new ByteArraySerializer().asInstanceOf[Serializer[A]])
case c if c == classOf[String] => Some(new StringSerializer().asInstanceOf[Serializer[A]])
case _ => None
}
} else None
}
}
Spark Streaming のテスト
h"ps://gist.github.com/a"y303/18e64e718f0cf3261c0e
class CountProductSpec extends SpecWithJUnit with SparkStreamingSpec {
val batchDuration: Duration = Duration(1000)
"Count" >> {
val (sourceQueue, resultQueue) = startQueueStream[Product, (Product, Long)] { inStream =>
// テスト鵑 Streaming I尖
CountProduct(inStream).run(sc)
}
// 秘薦キュ`にテストデ`タを誘秘する
sourceQueue += sc.parallelize(Seq(
Product(1, "id"), Product(1, "id"), Product(2, "id")))
// rgをMめる
advance()
// 竃薦されるデ`タをテストする
resultQueue.dequeue must eventually(contain(exactly(
Product(1, "id") -> 2L, Product(2, "id") -> 1L
)))
}
}

More Related Content

Spark Streaming Snippets

  • 2. 書まで恬った Spark Streaming アプリ ? rtg -- リアルタイムリタゲ喘デ`タ伏撹 ? pixelwriter -- Dynamic Crea4ve 喘マ`クデ`タきzみ ? feedsync -- 斌瞳フィ`ド揖豚 ? segment:elas4c -- リアルタイムセグメント晒 ? (logblend -- なるイベントログの Join)
  • 4. SparkBoot h"ps://gist.github.com/a"y303/c83f3c8cb8a930951be0 ? Spark アプリの main g廾 ? SparkContext / StreamingContext を戻工する ? Con?gura4on 砿尖 ? spark-submit の --files で applica4on.conf を僕ってカスタ マイズ
  • 5. バッチの栽 object TrainingBatchApp extends SparkBatchBoot { val appName = "TrainingBatchApp" override def mkApp(sc: SparkContext, args: Array[String]): SparkApp = new TrainingBatchApp(sc, appConfig) } class TrainingBatchApp( sc: SparkContext, appConfig: Config) extends SparkApp { def run(): Try[Int] = Try { 0 } }
  • 6. ストリ`ミングの栽 object PredictStreamingApp extends SparkStreamingBoot { val appName = "PredictStreamingApp" override val checkpointPath: String = "app.training.streaming.checkpoint-path" override val batchDurationPath: String = "app.training.streaming.batch-duration" override def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp = new PredictStreamingApp(ssc, batchDuration, appConfig) } class PredictStreamingApp( ssc: StreamingContext, appConfig: Config) extends SparkApp { val sparkContext = ssc.sparkContext def run(): Try[Int] = Try { 0 } }
  • 7. Cron 議なことをやる ? batch-dura+on よりLいg侯で協豚g佩したいI尖がある ? 翌何デ`タストアからiんでいるマスタデ`タのリフレッシュ など
  • 8. def repeatedly(streamingContext: StreamingContext, interval: Duration) (f: (SparkContext, Time) => Unit): Unit = { // トリガ`を伏撹する DStream val s = streamingContext.queueStream( mutable.Queue.empty[RDD[Unit]], oneAtATime = true, defaultRDD = streamingContext.sparkContext.makeRDD(Seq(()))) .repartition(1) s.window(s.slideDuration, interval) .foreachRDD { (rdd, time) => f(rdd.context, time) rdd.foreach(_ => ()) } }
  • 9. 聞い圭 repetedly(streamingContext, Durations.seconds(300)) { (sc, time) => // @driver: 5 蛍阿g佩するI尖 }
  • 10. 厚仟辛嬬な Broadcast ? Streaming が咾兵めた瘁に Broadcast を厚仟したい ? gなる Broadcast を隠隔するラッパ`
  • 11. /** * 、慮仟(壅ブロ`ドキャスト)が辛嬬な Broadcast * * https://gist.github.com/Reinvigorate/040a362ca8100347e1a6 * @author Reinvigorate */ case class UpdatableBroadcast[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
  • 12. 聞い圭 def loadModel(): Model = ??? val ub: UpdatableBroadcast[Model] = UpdatableBroadcast(streamingContext, loadModel()) StreamingUtil.repeatedly(streamingContext, refreshInterval) { (_, _) => ub.update(loadModel()) } dstream.foreachRDD { rdd => val model = ub.value // use model }
  • 13. 翌何俊Aの渇鷸 ? Spark で翌何リソ`スにアクセスするとき、?の Executor が 俊AをS隔する駅勣がある ? Driver から Executor に仝俊Aそのもの々を僕佚することはでき ない ? 仝俊Aする圭隈々を Connector trait として渇鷸している
  • 14. trait Connector[A] extends java.io.Closeable with Serializable { def get: A def close(): Unit def using[B](f: A => B): B = f(get) }
  • 15. case class PoolAerospikeConnector(name: Symbol, config: AerospikeConfig) extends Connector[AerospikeClient] { def get: AerospikeClient = PoolAerospikeConnector.defaultHolder.getOrCreate(name, mkClient) def close(): Unit = PoolAerospikeConnector.defaultHolder.remove(name)( AerospikeConnector.aerospikeClientClosable) private val mkClient: () => AerospikeClient = () => new AerospikeClient(config.clientPolicy.underlying, config.asHosts:_*) } object PoolAerospikeConnector { private val defaultHolder = new DefaultResourceHolder[AerospikeClient] }
  • 16. case class ScalikeJdbcConnector(name: Symbol, config: Config) extends Connector[Unit] { def get: Unit = { if (!ConnectionPool.isInitialized(name)) { // Load MySQL JDBC driver class Class.forName("com.mysql.jdbc.Driver") ConnectionPool.add(name, config.getString("url"), config.getString("user"), config.getString("password")) } } def close(): Unit = ConnectionPool.close(name) }
  • 17. case class KafkaProducerConnector[K :ClassTag, V :ClassTag]( name: Symbol, config: java.util.Map[String, AnyRef]) extends Connector[ScalaKafkaProducer[K, V]] { def get: ScalaKafkaProducer[K, V] = KafkaProducerConnector.defaultHolder.getOrCreate(name, mkResource) .asInstanceOf[ScalaKafkaProducer[K, V]] def close(): Unit = KafkaProducerConnector.defaultHolder.remove(name)( KafkaProducerConnector.kafkaProducerClosable) private val mkResource = () => { val keySer = mkDefaultSerializer[K](ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) val valueSer = mkDefaultSerializer[V](ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) new ScalaKafkaProducer[K, V]( new KafkaProducer[K, V](config, keySer.orNull, valueSer.orNull)) } private def mkDefaultSerializer[A :ClassTag](configKey: String): Option[Serializer[A]] = { if (!config.containsKey(configKey)) { implicitly[ClassTag[A]].runtimeClass match { case c if c == classOf[Array[Byte]] => Some(new ByteArraySerializer().asInstanceOf[Serializer[A]]) case c if c == classOf[String] => Some(new StringSerializer().asInstanceOf[Serializer[A]]) case _ => None } } else None } }
  • 19. class CountProductSpec extends SpecWithJUnit with SparkStreamingSpec { val batchDuration: Duration = Duration(1000) "Count" >> { val (sourceQueue, resultQueue) = startQueueStream[Product, (Product, Long)] { inStream => // テスト鵑 Streaming I尖 CountProduct(inStream).run(sc) } // 秘薦キュ`にテストデ`タを誘秘する sourceQueue += sc.parallelize(Seq( Product(1, "id"), Product(1, "id"), Product(2, "id"))) // rgをMめる advance() // 竃薦されるデ`タをテストする resultQueue.dequeue must eventually(contain(exactly( Product(1, "id") -> 2L, Product(2, "id") -> 1L ))) } }