This document discusses real-time data processing and analysis in the cloud. It describes how massive amounts of data are being generated and requires fast analysis. Building infrastructure for this is expensive, but there are many open-source projects available. The document demonstrates processing taxi ride data from New York City in real-time using Google Cloud technologies like Pub/Sub, Dataflow, and BigQuery. It also shows how to analyze airport rides separately to compare them to overall taxi rides. Finally, it mentions Apache Beam and provides some additional resources.
1 of 44
Download to read offline
More Related Content
Arctic15 keynote
1. Senior System Architect, Google
Developer Expert, Authorised Trainer
REAL-TIME DATA PROCESSING
AND ANALYSIS IN THE CLOUD
JERRY JALAVA - QVIK
JERRY@QVIK.FI | @W_I
8. @W_I @QVIK
Cloud Pub/Sub - A fully managed, global and scalable publish and
subscribe service with guaranteed at-least-once message delivery
Cloud Data?ow - A fully managed, auto-scalable service for pipeline
data processing in batch or streaming mode
BigQuery - A fully managed, petabyte scale, low-cost enterprise data
warehouse for analytics
12. DEMO
? Analyse “real-time” Taxi data from NYC
? >20000 events/s incoming
? 3M Taxi rides (1 week of data)
? Get insights
? Live visualisation of the rides
? How do the taxi rides from airports
compare to taxi rides overall
? Analyse archived data
@W_I @QVIK
30. @W_I @QVIK
AIRPORT RIDES
Key By Ride ID to group together
ride points from the same ride
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
31. @W_I @QVIK
AIRPORT RIDES
Sessions allow us to create a
window with all the same rides
grouped together, and then GC the
ride data once no more ride points
show up for ten minutes
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
32. @W_I @QVIK
AIRPORT RIDES
Triggering delivers the contents of
the ride window early and often:
elementCountAtLeast(1) ensures that
we get the first values after even a
single element shows up
Repeatedly.forever ensures we keep
getting updates
accumulatingFiredPanes ensures we
get full view of data
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
33. @W_I @QVIK
AIRPORT RIDES
Every time our window is triggered,
the Accumulator determines how
the data points in the window are
combined
AccumulatePoints():
- Keeps the pickup location,
necessary to know if the ride started
at the airport
- Keeps the most recent value, to
continuously emit update about the
ride
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
34. @W_I @QVIK
AIRPORT RIDES
Look at the pickup point in the
accumulator, and compare it with
Lat/Long coordinates to determine
if its an airport pickup
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
35. @W_I @QVIK
AIRPORT RIDES
For writing output, we only care
about the latest point from the
accumulator
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
36. @W_I @QVIK
AIRPORT RIDES
We write the resulting latest point to
Pub/Sub
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
42. APACHE BEAM
? In early 2016,?
Google announced their intention to
move the Data?ow programming model
and SDKs to the?
Apache Software Foundation
? Apache Beam is now a top level project
@QVIK