狠狠撸

狠狠撸Share a Scribd company logo
Senior System Architect, Google
Developer Expert, Authorised Trainer
REAL-TIME DATA PROCESSING
AND ANALYSIS IN THE CLOUD
JERRY JALAVA - QVIK
JERRY@QVIK.FI | @W_I
MASSIVE AMOUNTS OF DATA
WE PRODUCE
@W_I @QVIK
HAS BEEN GENERATED IN THE
PAST FEW YEARS
OVER 90% OF ALL THE DATA
@W_I @QVIK
REQUIRES YOU TO BE ABLE
TO ANALYSE THAT FAST
BEING COMPETITIVE
@W_I @QVIK
BUILDING THESE KIND OF
INFRASTRUCTURES IS
EXPENSIVE
BUT,
@W_I @QVIK
MANY GREAT OPEN-SOURCE
PROJECTS AVAILABLE
THERE ARE
@W_I @QVIK
REFERENCE ARCHITECTURE
@W_I @QVIK
Ingest
Devices /
Systems
generating
events
Message
Queue
Processing
Data
Processing
Storage
Time-series
Database
Data
Warehouse
@W_I @QVIK
Cloud Pub/Sub - A fully managed, global and scalable publish and
subscribe service with guaranteed at-least-once message delivery
Cloud Data?ow - A fully managed, auto-scalable service for pipeline
data processing in batch or streaming mode
BigQuery - A fully managed, petabyte scale, low-cost enterprise data
warehouse for analytics
REFERENCE ARCHITECTURE
@W_I @QVIK
Ingest
Devices /
Systems
generating
events
Processing
Data
Processing
Storage
Time-series
Database
Data
Warehouse
Cloud
Pub/Sub
REFERENCE ARCHITECTURE
@W_I @QVIK
Ingest
Devices /
Systems
generating
events
Processing
Data
Processing
Storage
Cloud
Pub/Sub
BigQuery
Cloud
Bigtable
REFERENCE ARCHITECTURE
@W_I @QVIK
Ingest
Devices /
Systems
generating
events
Processing Storage
Cloud
Pub/Sub
BigQuery
Cloud
Bigtable
Cloud
Dataflow
DEMO
? Analyse “real-time” Taxi data from NYC
? >20000 events/s incoming
? 3M Taxi rides (1 week of data)
? Get insights
? Live visualisation of the rides
? How do the taxi rides from airports
compare to taxi rides overall
? Analyse archived data
@W_I @QVIK
DEMO ARCHITECTURE
@W_I @QVIK
Ingest Processing
Messaging
Cloud Pub/Sub
Telemetry
Rides
Cloud Dataflow
Aggregate
Dashboard Application
Taxis
Messaging
Cloud Pub/Sub
Display Data
@W_I @QVIK
@W_I @QVIK
MULTIPLE DATA PROCESSING
REQUIREMENTS
? Correctness, completeness, reliability,
scalability, and performance?
? Continuous event processing
? Continuous result delivery
? Scalable ETL for continuous archival?
? Analyst-ready big data sets
@W_I @QVIK
@W_I @QVIK
COUNT RIDES Taxi Data
Output
(Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,?
(Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30
@W_I @QVIK
COUNT RIDES Taxi Data
Output
(Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,?
(Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30
Window In Time
{[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 }
{[2:00, 2:30) → (K, M) @2:30}
@W_I @QVIK
COUNT RIDES Taxi Data
Output
(Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,?
(Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30
Window In Time
{[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 }
{[2:00, 2:30) → (K, M) @2:30}
Group In Space
{ (X, Y), [1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01}
{ (K, M), [1:00, 2:00) → (K, M) @1:03 }
{ (K, M), [2:00, 3:00) → (K, M) @2:30 }
@W_I @QVIK
COUNT RIDES Taxi Data
Output
(Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,?
(Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30
Window In Time
{[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 }
{[2:00, 2:30) → (K, M) @2:30}
Group In Space
{ (X, Y), [1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01}
{ (K, M), [1:00, 2:00) → (K, M) @1:03 }
{ (K, M), [2:00, 3:00) → (K, M) @2:30 }
Count
{ (X, Y), [1:00, 2:00) → 2}
{ (K, M), [1:00, 2:00) → 1}
{ (K, M), [2:00, 3:00) → 1}
@W_I @QVIK
COUNT RIDES Taxi Data
Output
Window In Time
Group In Space
Count
p.apply(PubsubIO.Read.topic(taxiInTopic)
.apply("window 1s",
Window.into(FixedWindows.of(?
Duration.standardSeconds(1))
)
)
.apply("condense rides",?
MapElements.via(new CondenseRides())
)
.apply("count similar", Count.perKey())
.apply(PubsubIO.Write.topic(taxiOutTopic);
@W_I @QVIK
COUNT RIDES Taxi Data
Output
Window In Time
Group In Space
Count
p.apply(PubsubIO.Read.topic(taxiInTopic)
.apply("window 1s",
Window.into(FixedWindows.of(?
Duration.standardSeconds(1))
)
)
.apply("condense rides",?
MapElements.via(new CondenseRides())
)
.apply("count similar", Count.perKey())
.apply(PubsubIO.Write.topic(taxiOutTopic);
private static class CondenseRides?
extends SimpleFunction<TableRow, KV<LatLon, TableRow>> {
public KV<LatLon, TableRow> apply(TableRow t) {
final float box = 0.001f; // very approximately 100m
float roundedLat = Math.floor(t.get("latitude") / box) * box + box / 2;
float roundedLon = Math.floor(t.get("longitude"). / box) * box + box / 2;
LatLon key = new LatLon(roundedLat, roundedLon);
return KV.of(key, t);
}
}
@W_I @QVIK
#java com.google.codelabs.dataflow.CountRides 
—streaming=true —project=arctic15-demo --sourceProject=arctic15-demo 
—sourceTopic=taxifeed1 --sinkProject=arctic15-demo --runner=DataflowPipelineRunner 
—zone=eu-west1-c --numWorkers=3 --stagingLocation=gs://arctic15-demo 
—sinkTopic=visualisation-sink-1
@W_I @QVIK
@W_I @QVIK
10X Reduction In
Messages per
Second
@W_I @QVIK
@W_I @QVIK
HOW DO THE TAXI RIDES FROM
AIRPORTS COMPARE TO OVERALL
TAXI RIDES
GETTING INSIGHTS
@W_I @QVIK
@W_I @QVIK
AIRPORT RIDES
Read from PubSub
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
Key By Ride ID to group together
ride points from the same ride
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
Sessions allow us to create a
window with all the same rides
grouped together, and then GC the
ride data once no more ride points
show up for ten minutes
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
Triggering delivers the contents of
the ride window early and often:
elementCountAtLeast(1) ensures that
we get the first values after even a
single element shows up
Repeatedly.forever ensures we keep
getting updates
accumulatingFiredPanes ensures we
get full view of data
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
Every time our window is triggered,
the Accumulator determines how
the data points in the window are
combined
AccumulatePoints():
- Keeps the pickup location,
necessary to know if the ride started
at the airport
- Keeps the most recent value, to
continuously emit update about the
ride
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
Look at the pickup point in the
accumulator, and compare it with
Lat/Long coordinates to determine
if its an airport pickup
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
For writing output, we only care
about the latest point from the
accumulator
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
AIRPORT RIDES
We write the resulting latest point to
Pub/Sub
p.apply(PubsubIO.Read(inputTopic))
.apply(“Key By Ride ID”, MapElements.via(?
(TableRow ride) ->
KV.of(ride.get("ride_id"), ride)))
.apply(Window.into(
Sessions.withGapDuration(TEN_MIN))) ?
.apply(Window.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
.apply(Combine.perKey(new AccumulatePoints()))
.apply(ParDo.of(new FilterAtAirport()))
.apply(ParDo.of(new ExtractLatest())
.apply(PubsubIO.Write(outputTopic));
@W_I @QVIK
UPDATED DEMO ARCHITECTURE
@W_I @QVIK
Ingest Processing
Messaging
Cloud Pub/Sub
Telemetry
Rides
Cloud Dataflow
Aggregate
Dashboard Application
Taxis
Messaging
Cloud Pub/Sub
Display Data
Insights
Analytics
BigQuery
Data Warehouse
ETL Pipeline
Cloud Dataflow
Archival-grade aggregates
●Create another ETL pipeline PubSub <-> BigQuery
●Composition: save output of regular taxi data and filtered airport data
@W_I @QVIK
@W_I @QVIK
@W_I @QVIK
RIDE DATA OVER TIME
APACHE BEAM
? In early 2016,?
Google announced their intention to
move the Data?ow programming model
and SDKs to the?
Apache Software Foundation
? Apache Beam is now a top level project
@QVIK
SOME RESOURCES
? cloud.google.com/data?ow
? beam.apache.org
? codelabs.developers.google.com
? Big Data facts (forbes.com)
@QVIK
THANK YOU!
LET’S CREATE IT TOGETHER
jerry@qvik.? | @W_I

More Related Content

Arctic15 keynote

  • 1. Senior System Architect, Google Developer Expert, Authorised Trainer REAL-TIME DATA PROCESSING AND ANALYSIS IN THE CLOUD JERRY JALAVA - QVIK JERRY@QVIK.FI | @W_I
  • 2. MASSIVE AMOUNTS OF DATA WE PRODUCE @W_I @QVIK
  • 3. HAS BEEN GENERATED IN THE PAST FEW YEARS OVER 90% OF ALL THE DATA @W_I @QVIK
  • 4. REQUIRES YOU TO BE ABLE TO ANALYSE THAT FAST BEING COMPETITIVE @W_I @QVIK
  • 5. BUILDING THESE KIND OF INFRASTRUCTURES IS EXPENSIVE BUT, @W_I @QVIK
  • 6. MANY GREAT OPEN-SOURCE PROJECTS AVAILABLE THERE ARE @W_I @QVIK
  • 7. REFERENCE ARCHITECTURE @W_I @QVIK Ingest Devices / Systems generating events Message Queue Processing Data Processing Storage Time-series Database Data Warehouse
  • 8. @W_I @QVIK Cloud Pub/Sub - A fully managed, global and scalable publish and subscribe service with guaranteed at-least-once message delivery Cloud Data?ow - A fully managed, auto-scalable service for pipeline data processing in batch or streaming mode BigQuery - A fully managed, petabyte scale, low-cost enterprise data warehouse for analytics
  • 9. REFERENCE ARCHITECTURE @W_I @QVIK Ingest Devices / Systems generating events Processing Data Processing Storage Time-series Database Data Warehouse Cloud Pub/Sub
  • 10. REFERENCE ARCHITECTURE @W_I @QVIK Ingest Devices / Systems generating events Processing Data Processing Storage Cloud Pub/Sub BigQuery Cloud Bigtable
  • 11. REFERENCE ARCHITECTURE @W_I @QVIK Ingest Devices / Systems generating events Processing Storage Cloud Pub/Sub BigQuery Cloud Bigtable Cloud Dataflow
  • 12. DEMO ? Analyse “real-time” Taxi data from NYC ? >20000 events/s incoming ? 3M Taxi rides (1 week of data) ? Get insights ? Live visualisation of the rides ? How do the taxi rides from airports compare to taxi rides overall ? Analyse archived data @W_I @QVIK
  • 13. DEMO ARCHITECTURE @W_I @QVIK Ingest Processing Messaging Cloud Pub/Sub Telemetry Rides Cloud Dataflow Aggregate Dashboard Application Taxis Messaging Cloud Pub/Sub Display Data
  • 16. MULTIPLE DATA PROCESSING REQUIREMENTS ? Correctness, completeness, reliability, scalability, and performance? ? Continuous event processing ? Continuous result delivery ? Scalable ETL for continuous archival? ? Analyst-ready big data sets @W_I @QVIK
  • 17. @W_I @QVIK COUNT RIDES Taxi Data Output (Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,? (Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30
  • 18. @W_I @QVIK COUNT RIDES Taxi Data Output (Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,? (Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30 Window In Time {[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 } {[2:00, 2:30) → (K, M) @2:30}
  • 19. @W_I @QVIK COUNT RIDES Taxi Data Output (Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,? (Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30 Window In Time {[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 } {[2:00, 2:30) → (K, M) @2:30} Group In Space { (X, Y), [1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01} { (K, M), [1:00, 2:00) → (K, M) @1:03 } { (K, M), [2:00, 3:00) → (K, M) @2:30 }
  • 20. @W_I @QVIK COUNT RIDES Taxi Data Output (Lax X, Lon Y) @1:00, (Lat X, Lon Y) @1:01,? (Lat K, Lon M) @1:03, (Lat K, Lon M) @ 2:30 Window In Time {[1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01, (K, M) @1:03 } {[2:00, 2:30) → (K, M) @2:30} Group In Space { (X, Y), [1:00, 2:00) → (X, Y) @1:00, (X, Y) @1:01} { (K, M), [1:00, 2:00) → (K, M) @1:03 } { (K, M), [2:00, 3:00) → (K, M) @2:30 } Count { (X, Y), [1:00, 2:00) → 2} { (K, M), [1:00, 2:00) → 1} { (K, M), [2:00, 3:00) → 1}
  • 21. @W_I @QVIK COUNT RIDES Taxi Data Output Window In Time Group In Space Count p.apply(PubsubIO.Read.topic(taxiInTopic) .apply("window 1s", Window.into(FixedWindows.of(? Duration.standardSeconds(1)) ) ) .apply("condense rides",? MapElements.via(new CondenseRides()) ) .apply("count similar", Count.perKey()) .apply(PubsubIO.Write.topic(taxiOutTopic);
  • 22. @W_I @QVIK COUNT RIDES Taxi Data Output Window In Time Group In Space Count p.apply(PubsubIO.Read.topic(taxiInTopic) .apply("window 1s", Window.into(FixedWindows.of(? Duration.standardSeconds(1)) ) ) .apply("condense rides",? MapElements.via(new CondenseRides()) ) .apply("count similar", Count.perKey()) .apply(PubsubIO.Write.topic(taxiOutTopic); private static class CondenseRides? extends SimpleFunction<TableRow, KV<LatLon, TableRow>> { public KV<LatLon, TableRow> apply(TableRow t) { final float box = 0.001f; // very approximately 100m float roundedLat = Math.floor(t.get("latitude") / box) * box + box / 2; float roundedLon = Math.floor(t.get("longitude"). / box) * box + box / 2; LatLon key = new LatLon(roundedLat, roundedLon); return KV.of(key, t); } }
  • 23. @W_I @QVIK #java com.google.codelabs.dataflow.CountRides —streaming=true —project=arctic15-demo --sourceProject=arctic15-demo —sourceTopic=taxifeed1 --sinkProject=arctic15-demo --runner=DataflowPipelineRunner —zone=eu-west1-c --numWorkers=3 --stagingLocation=gs://arctic15-demo —sinkTopic=visualisation-sink-1
  • 25. @W_I @QVIK 10X Reduction In Messages per Second
  • 28. HOW DO THE TAXI RIDES FROM AIRPORTS COMPARE TO OVERALL TAXI RIDES GETTING INSIGHTS @W_I @QVIK
  • 29. @W_I @QVIK AIRPORT RIDES Read from PubSub p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 30. @W_I @QVIK AIRPORT RIDES Key By Ride ID to group together ride points from the same ride p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 31. @W_I @QVIK AIRPORT RIDES Sessions allow us to create a window with all the same rides grouped together, and then GC the ride data once no more ride points show up for ten minutes p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 32. @W_I @QVIK AIRPORT RIDES Triggering delivers the contents of the ride window early and often: elementCountAtLeast(1) ensures that we get the first values after even a single element shows up Repeatedly.forever ensures we keep getting updates accumulatingFiredPanes ensures we get full view of data p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 33. @W_I @QVIK AIRPORT RIDES Every time our window is triggered, the Accumulator determines how the data points in the window are combined AccumulatePoints(): - Keeps the pickup location, necessary to know if the ride started at the airport - Keeps the most recent value, to continuously emit update about the ride p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 34. @W_I @QVIK AIRPORT RIDES Look at the pickup point in the accumulator, and compare it with Lat/Long coordinates to determine if its an airport pickup p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 35. @W_I @QVIK AIRPORT RIDES For writing output, we only care about the latest point from the accumulator p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 36. @W_I @QVIK AIRPORT RIDES We write the resulting latest point to Pub/Sub p.apply(PubsubIO.Read(inputTopic)) .apply(“Key By Ride ID”, MapElements.via(? (TableRow ride) -> KV.of(ride.get("ride_id"), ride))) .apply(Window.into( Sessions.withGapDuration(TEN_MIN))) ? .apply(Window.triggering(Repeatedly.forever( AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) .apply(Combine.perKey(new AccumulatePoints())) .apply(ParDo.of(new FilterAtAirport())) .apply(ParDo.of(new ExtractLatest()) .apply(PubsubIO.Write(outputTopic));
  • 38. UPDATED DEMO ARCHITECTURE @W_I @QVIK Ingest Processing Messaging Cloud Pub/Sub Telemetry Rides Cloud Dataflow Aggregate Dashboard Application Taxis Messaging Cloud Pub/Sub Display Data Insights Analytics BigQuery Data Warehouse ETL Pipeline Cloud Dataflow Archival-grade aggregates ●Create another ETL pipeline PubSub <-> BigQuery ●Composition: save output of regular taxi data and filtered airport data
  • 41. @W_I @QVIK RIDE DATA OVER TIME
  • 42. APACHE BEAM ? In early 2016,? Google announced their intention to move the Data?ow programming model and SDKs to the? Apache Software Foundation ? Apache Beam is now a top level project @QVIK
  • 43. SOME RESOURCES ? cloud.google.com/data?ow ? beam.apache.org ? codelabs.developers.google.com ? Big Data facts (forbes.com) @QVIK
  • 44. THANK YOU! LET’S CREATE IT TOGETHER jerry@qvik.? | @W_I