際際滷

際際滷Share a Scribd company logo
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
# UserId, ActivityId, Latitude, Longitude, Timestamp
101528757,285751033,51.517227,-0.101553,1429087808
101528757,285751033,51.517296,-0.101817,1429087812
101528757,285751033,51.517353,-0.102064,1429087816
101528757,285751033,51.517445,-0.102144,1429087820
101528757,285751033,51.517475,-0.102259,1429087824
101528757,285751033,51.51743,-0.102343,1429087828
101528757,285751033,51.517338,-0.102309,1429087837
101528757,285751033,51.517307,-0.102303,1429087857
101528757,285751033,51.517296,-0.102346,1429087864
101528757,285751033,51.517284,-0.102388,1429087877
101528757,285751033,51.51729,-0.102321,1429087959
101528757,285751033,51.51729,-0.102248,1429087961
101528757,285751033,51.517338,-0.102088,1429087965
101528757,285751033,51.51737,-0.10196,1429087969
Total Size: 560GB
Activities: 2.5 million
GPS Locations: 3.5 billion
Processing planetary sized datasets
Processing planetary sized datasets
 Many activities
per user.
 Want to be able
to pull a time
range of user
locations for
activity display.
User Id Timestamp Latitude Longitude

10152875766888406 1445374423000 36.966819 -122.012298
10152875766888406 1445377625000 36.966845 -122.012248
10152875766888406 1445377627000 36.966877 -122.012228
10152875766888406 1445377629000 36.966913 -122.012236
10152875766888406 1445377630000 36.966946 -122.012236
10152875766888406 1445377631000 36.966984 -122.012263
10152875766888406 1445379512000 36.967027 -122.012281
This is a challenge with a large dataset:
 A traditional relational database typically requires
hand sharding to scale to PBs of data (eg. Postgres).
 Highly indexed non relational solutions can be very
expensive (eg. MongoDB).
 Lightly indexed solutions are a good fit because we
really only have one query we need to execute
against the data.
PartitionKey
(userId)
RowKey
(timestamp)
Latitude Longitude
1015287576688840
6
1445377623000 36.966819 -122.012298
1015287576688840
6
1445377625000 36.966845 -122.012248
1015287576688840
6
1445377627000 36.966877 -122.012228
1015287576688840
6
1445377629000 36.966913 -122.012236
1015287576688840
6
1445377630000 36.966946 -122.012236
 Want to query a
set of activities
in a bounding
box.
 Also want to
filter activities
based on
distance and
duration.
activity id start (sec) finish (sec) distance (m) Duration (m) bbox
(geometry)
101528 1445377625 1445383025 50023 6222 [-104.990,
39.7392...
101643 1445362577 1445373616 28778 2498 [-122.01228,
36.96
101843 1445377627 1445382432 4629 701 [0.1278, 51.5074

101901 1445362577 1445374713 99691 14232 [139.6917,
35.699...
102102 1445374713 1445374713 25259 6657 [1.3521,
103.8129
user Id timestamp latitude longitude
1015287576688840
6
144537762
3
36.966819 -122.012298
1015287576688840
6
144537762
5
36.966845 -122.012248

1015287576688840
6
144538302
5
36.966913 -122.012236
1015287576688840
6
144538303
0
36.966946 -122.012236
activity
id
start finish  bbox
101528 1445362577 1445373616  [-104.990, 39.7392...
101643 1445377625 1445383025  [-122.01228, 36.96
101843 1445377627 1445382432  [0.1278, 51.5074 
101901 1445362577 1445374713  [139.6917, 35.699...
102102 1445374713 1445374713  [1.3521, 103.8129
Location Data
(Azure Table Storage)
Activity Data
(Postgres + PostGIS)
 Total number
of location
samples in a
geographical
area.
 Whole dataset
operation.
 Based on Apache Spark
 Offered as a Service in Azure as HDInsight
Spark.
 Can think of it is as Hadoop the Next
Generation
 Better performance (10-100x)
 Cleaner programming model
 Divides world
up into tiles.
 Each tile has
four children at
the next higher
zoom level.
 Maps 2
dimension
space to 1
dimension.
For each location, map to tiles at every zoom level:
(36.9741, -122.0308)  [
(10_398_164, 1), (11_797_329, 1)
(12_1594_659, 1), (13_3189_1319, 1),
(14_6378_2638, 1), (15_12757_5276,1),
(16_25514_10552, 1), (17_51028_21105, 1),
(18_102057_42211, 1)
]
def tile_id_mapper(location):
tileMappings = []
tileIds = Tile.tile_ids_for_zoom_levels(
location['latitude'],
location['longitude'],
MIN_ZOOM_LEVEL,
MAX_ZOOM_LEVEL
)
for tileId in tileIds:
tileMappings.append(
(tileId, 1)
)
return tileMappings
Reduce all these mappings with the same key into an
aggregate value:
(10_398_164, 151)  [
(10_398_164, 15), (10_398_164, 28)
(10_398_164, 29), (10_398_164, 17),
(10_398_164, 31), (10_398_164, 2),
(10_398_164, 16), (10_398_164, 2),
(10_398_164, 11)
]
lines = sc.textFile('wasb://locations@loc.blob.core.windows.net/')
locations = lines.flatMap(json_loader)
heatmap = locations
.flatMap(tile_id_mapper)
.reduceByKey(lambda agg1,agg2: agg1+agg2)
heatmap.saveAsTextFile('wasb://heatmap@loc.blob.core.windows.net/');
Building the heatmap then boils down to this in Spark:
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
Processing planetary sized datasets
6_25_31
9_201_249
9_201_250
9_201_248
9_201_245
9_201_247
8_100_124
8_100_125
8_100_126
7_50_62
7_50_63
7_50_64
Processing planetary sized datasets
User Id Activity Id Timestamp Latitude Longitude

10152875766888406 57169639 1445377623000 36.966819 -122.012298
10152875766888406 57169639 1445377625000 36.966845 -122.012248
10152875766888406 57169639 1445377627000 36.966877 -122.012228
10152875766888406 57169639 1445377629000 36.966913 -122.012236
10152875766888406 57169639 1445377630000 36.966946 -122.012236
10152875766888406 57169639 1445377631000 36.966984 -122.012263
10152875766888406 57169639 1445377632000 36.967027 -122.012281
Raw Data Elevation
Enriched
let azure = require('azure-storage'),
elevationService = require('../services/elevation');
module.exports = function(context, locationBlob) {
let locations = locationBlob.split('n');
elevationService.enrichLocations(locations, (err, enrichedLocations) => {
if (err) return context.done(err);
// ... save enrichedlocations to blob ...
context.done();
});
};
560 GB 224 GB
JSON Avro
60%
Smaller
 geotile: http://github.com/timfpark/geotile
 XYZ tile math in C#, JavaScript, and Python
 heatmap:
http://github.com/timfpark/heatmap
 Spark code for building heatmaps
Tim Park
@timpark

More Related Content

Processing planetary sized datasets

  • 8. # UserId, ActivityId, Latitude, Longitude, Timestamp 101528757,285751033,51.517227,-0.101553,1429087808 101528757,285751033,51.517296,-0.101817,1429087812 101528757,285751033,51.517353,-0.102064,1429087816 101528757,285751033,51.517445,-0.102144,1429087820 101528757,285751033,51.517475,-0.102259,1429087824 101528757,285751033,51.51743,-0.102343,1429087828 101528757,285751033,51.517338,-0.102309,1429087837 101528757,285751033,51.517307,-0.102303,1429087857 101528757,285751033,51.517296,-0.102346,1429087864 101528757,285751033,51.517284,-0.102388,1429087877 101528757,285751033,51.51729,-0.102321,1429087959 101528757,285751033,51.51729,-0.102248,1429087961 101528757,285751033,51.517338,-0.102088,1429087965 101528757,285751033,51.51737,-0.10196,1429087969
  • 9. Total Size: 560GB Activities: 2.5 million GPS Locations: 3.5 billion
  • 12. Many activities per user. Want to be able to pull a time range of user locations for activity display.
  • 13. User Id Timestamp Latitude Longitude 10152875766888406 1445374423000 36.966819 -122.012298 10152875766888406 1445377625000 36.966845 -122.012248 10152875766888406 1445377627000 36.966877 -122.012228 10152875766888406 1445377629000 36.966913 -122.012236 10152875766888406 1445377630000 36.966946 -122.012236 10152875766888406 1445377631000 36.966984 -122.012263 10152875766888406 1445379512000 36.967027 -122.012281
  • 14. This is a challenge with a large dataset: A traditional relational database typically requires hand sharding to scale to PBs of data (eg. Postgres). Highly indexed non relational solutions can be very expensive (eg. MongoDB). Lightly indexed solutions are a good fit because we really only have one query we need to execute against the data.
  • 15. PartitionKey (userId) RowKey (timestamp) Latitude Longitude 1015287576688840 6 1445377623000 36.966819 -122.012298 1015287576688840 6 1445377625000 36.966845 -122.012248 1015287576688840 6 1445377627000 36.966877 -122.012228 1015287576688840 6 1445377629000 36.966913 -122.012236 1015287576688840 6 1445377630000 36.966946 -122.012236
  • 16. Want to query a set of activities in a bounding box. Also want to filter activities based on distance and duration.
  • 17. activity id start (sec) finish (sec) distance (m) Duration (m) bbox (geometry) 101528 1445377625 1445383025 50023 6222 [-104.990, 39.7392... 101643 1445362577 1445373616 28778 2498 [-122.01228, 36.96 101843 1445377627 1445382432 4629 701 [0.1278, 51.5074 101901 1445362577 1445374713 99691 14232 [139.6917, 35.699... 102102 1445374713 1445374713 25259 6657 [1.3521, 103.8129
  • 18. user Id timestamp latitude longitude 1015287576688840 6 144537762 3 36.966819 -122.012298 1015287576688840 6 144537762 5 36.966845 -122.012248 1015287576688840 6 144538302 5 36.966913 -122.012236 1015287576688840 6 144538303 0 36.966946 -122.012236 activity id start finish bbox 101528 1445362577 1445373616 [-104.990, 39.7392... 101643 1445377625 1445383025 [-122.01228, 36.96 101843 1445377627 1445382432 [0.1278, 51.5074 101901 1445362577 1445374713 [139.6917, 35.699... 102102 1445374713 1445374713 [1.3521, 103.8129 Location Data (Azure Table Storage) Activity Data (Postgres + PostGIS)
  • 19. Total number of location samples in a geographical area. Whole dataset operation.
  • 20. Based on Apache Spark Offered as a Service in Azure as HDInsight Spark. Can think of it is as Hadoop the Next Generation Better performance (10-100x) Cleaner programming model
  • 21. Divides world up into tiles. Each tile has four children at the next higher zoom level. Maps 2 dimension space to 1 dimension.
  • 22. For each location, map to tiles at every zoom level: (36.9741, -122.0308) [ (10_398_164, 1), (11_797_329, 1) (12_1594_659, 1), (13_3189_1319, 1), (14_6378_2638, 1), (15_12757_5276,1), (16_25514_10552, 1), (17_51028_21105, 1), (18_102057_42211, 1) ]
  • 23. def tile_id_mapper(location): tileMappings = [] tileIds = Tile.tile_ids_for_zoom_levels( location['latitude'], location['longitude'], MIN_ZOOM_LEVEL, MAX_ZOOM_LEVEL ) for tileId in tileIds: tileMappings.append( (tileId, 1) ) return tileMappings
  • 24. Reduce all these mappings with the same key into an aggregate value: (10_398_164, 151) [ (10_398_164, 15), (10_398_164, 28) (10_398_164, 29), (10_398_164, 17), (10_398_164, 31), (10_398_164, 2), (10_398_164, 16), (10_398_164, 2), (10_398_164, 11) ]
  • 25. lines = sc.textFile('wasb://locations@loc.blob.core.windows.net/') locations = lines.flatMap(json_loader) heatmap = locations .flatMap(tile_id_mapper) .reduceByKey(lambda agg1,agg2: agg1+agg2) heatmap.saveAsTextFile('wasb://heatmap@loc.blob.core.windows.net/'); Building the heatmap then boils down to this in Spark:
  • 27.
  • 33. User Id Activity Id Timestamp Latitude Longitude 10152875766888406 57169639 1445377623000 36.966819 -122.012298 10152875766888406 57169639 1445377625000 36.966845 -122.012248 10152875766888406 57169639 1445377627000 36.966877 -122.012228 10152875766888406 57169639 1445377629000 36.966913 -122.012236 10152875766888406 57169639 1445377630000 36.966946 -122.012236 10152875766888406 57169639 1445377631000 36.966984 -122.012263 10152875766888406 57169639 1445377632000 36.967027 -122.012281
  • 35. let azure = require('azure-storage'), elevationService = require('../services/elevation'); module.exports = function(context, locationBlob) { let locations = locationBlob.split('n'); elevationService.enrichLocations(locations, (err, enrichedLocations) => { if (err) return context.done(err); // ... save enrichedlocations to blob ... context.done(); }); };
  • 36. 560 GB 224 GB JSON Avro 60% Smaller
  • 37. geotile: http://github.com/timfpark/geotile XYZ tile math in C#, JavaScript, and Python heatmap: http://github.com/timfpark/heatmap Spark code for building heatmaps

Editor's Notes

  • #2: Im Tim Park So today Im going to talk about processing geospatial data at scale
  • #3: We are living in a world where everything and everybody has location data associated with it either as part of the vehicle they are in, the app on the phone that they are using, or the package that they are sending. Increasingly it means that this geospatial data is woven into the applications that we are building as developers. My talk today is around processing that geospatial data in Azure.
  • #4: My team at Microsoft has done a number of projects with customers and partners with geospatial data. Weve used geotagged social data to help the UN monitor humanitarian zones.
  • #5: Weve worked with Guide Dogs for the Blind to build a device that uses data from. The app helps blind people: * Discover where they are * What's around them * And helps navigate them to locations Sara Spalding is going to talk more about this overall project here at DecodedConf. And Erik Schlegel is going to talk about how Guide Dogs has used Open Street Maps from a technical perspective as well
  • #6: And weve worked with German physical display advertising firm Stroeer to process geotagged data To build emotion maps of what the overall sentiment is in different parts of the country And how different advertising interventions have affected these sentiments on both a micro and macro scale. In the course of working on all of these projects, weve identified a number of common patterns. My talk today is about those patterns.
  • #7: To make this discussion simpler than using one of our project datasets, I'm going to frame the talk today around a dataset near and dear to my heart I love to explore in the outdoors. so I am going to frame my talk today about how we could take a large crowdsourced set of GPS traces from outdoors And turn it into a trail map of the whole world. And along the way describe all of the patterns we've discovered in the projects that we have done.
  • #8: This scenario also allows me to open source the dataset and the code that I used so that you can try it at home.
  • #9: So what does the shape of this dataset look like Its basically a whole lot of CSV files with location information that includes: User id, activity id (which identifies all of the data that is part of the same run, walk, hike, bike ride, etc.), timestamp, latitude and longitude There are many many users Who each have many activities Which all have many timestamped locations.
  • #10: The dataset we are going to be working fairly large It consists of over 500GB of data Which is larger than any one computing node. And therefore we have to use larger scale data techniques in order to process it In this 500GB, we have over 2.5 million different activities, with 3.5 billion different GPS locations associated with those activities
  • #11: So what does the shape of this dataset look like Its basically a whole lot of CSV files with location information that includes: an user id, activity id, timestamp, latitude and longitude There are many many users Who each have many activities Which all have many timestamped locations.
  • #12: Now that we have a idea of the dataset, lets look at the final application we are trying to build So we have a better context on what we need in terms of the data processing.
  • #13: So we have this giant pile of incoming data. So how do we process all of this at scale?
  • #14: Lets start by talking about how we store the GPS locations As we talked about previously, An activity has a timestamp ordered set of locations To display and activity, we need to pull all of the associated locations
  • #15: So what we need from the storage system we use for them is the ability to pull a range of timestamps With this, we can pull all of the locations for a particular activity.
  • #16: This is pretty standard stuff for a database but becomes a challenge with a large dataset (reasons above)
  • #17: Fortunately, Azure has a very good solution for this. Azure Table Storage sits somewhere just above blob storage. First off, it is very inexpensive, at 2 cents per GB per month. Unlike blob storage, you can access ranges and individual rows in the data. You can only query on a set of RowKeys within the same PartitionKey. But we only need to query on timestamp. It also scales with constant perf up to 500TB per storage account. Which is well beyond our dataset And it satisfies our need to be able to query a range of user locations by timestamp range.
  • #18: Looking at Activity Storage itself now, one of the key queries we want to do is around querying the activities in a bounding box. We also, in the future, want to be able to filter activities on a distance or duration.
  • #19: What this essentially means is that unlike the location data, we need the activity data to be highly indexed. We want to be able to do queries like Give me all of the activities under 10km in length near Dublin or Give me all of the activities over 1 hour in duration near Dublin And this means we need the columns highlight of this data to be indexed. Which is not a great fit for Azure Table Storage that we saw in the first pattern. The good news is that there is 3 orders of magnitude less data as well. And this makes it a great fit for a relational database like MySQL, Postgres, or Azure SQL. Set up schemaed tables Allows us to make rich queries against it.
  • #20: Which brings us to the second pattern for dealing with high scale data like this: Using polyglot persistence, which is using multiple databases types to solve a particular applications needs. Each chosen because they excel at a portion of the problem that we are trying to solve. As we saw before, I am using Azure Table Storage for the location data. And for the Activity Data, I am using PostgresSQL + PostGIS. The way this works is that we query for activities using Postgres and then query for the location data, when needed, from Table Storage. My collegue David Makogon is going to do a much longer talk about this topic, and will describe the strengths of each database type and in what scenarios they best make sense in his talk. Ok, so that is how we are storing and querying activities. We load locations into Table Storage and activities into Postgres at creation and then can query them.
  • #21: When we backing out to the progressive heatmap we saw of activities, however, we realize we need a whole different set of tools. Our heatmap is generated by summing up the number of location samples in a particular geographical area. In this case, we are using small squares as the boundaries for our summaries. This means that it is operating over the whole dataset, and given the size of the dataset, we need to open our big data toolbox to realistically accomplish this.
  • #22: Im going to use HDInsight Spark to accomplish this here. For those of you that have used Hadoop, it operates on data in a similar paradigm But offers much better performance and a much nicer programming model than the original Hadoop engine. HDInsight Spark is Azures hosted version of Apache Spark. It makes it easy to spin up a cluster of machines to process the data This allows you to ignore the operations work of running a Spark cluster and focus on the actual problem we are trying to solve.
  • #23: Before we dive into the Spark code Lets discuss how we map a particular location point to a geographic summary. One of the common patterns weve seen is using XYZ Tile as a summarization bucket. This is not something weve invented. Google, Apple, and Open Street Maps use the same concept for addressing the tiles in their maps. It is a fairly simple concept. The top level world is divided into 4 tiles. For each zoom level below that, you take the parent tile and divide it into 4 tiles. A individual tile is then addressed by its zoom level and its row and column within that zoom level.
  • #24: In order to compute our heatmap, we use a pretty standard issue map / reduce algorithm. For every location in the dataset, we generate a tile id key/value pair for every zoom level that we want results for. In this case, we are generating tile key/value pairs for the zoom levels from 10 to 18 Because we know that that is the only set of tiles that the user interface will end up using.
  • #25: In python, this is what this mapper looks like. We first compute the tileIds for all of the zoom levels that we want to collect results for. And then use that to build tuples for each tileId.
  • #26: We then reduce all of these mappings with the same key down into its aggregate heatmap value. In spark, the first element in a tuple is considered the key And the second element is considered the value Which is to say, we take all of the locations with the same tile ids from the previous step and count them.
  • #27: With this in mind, the actual implementation in Spark is fairly straightforward. We point it at the dataset in blob storage, then parse it as JSON, And then use the tile_id_mapper function we defined earlier to map each location to the appropriate zoom level result. We then reduce all of these individual results by the key to get a final total for each tile result. We implement the reducer as a anonymous function, called a lambda in Python, that essentially sums the intermediate aggregates for tiles with the same id.
  • #28: From a programming standpoint, spark makes this look really easy. But at the physical level, Spark is doing a lot of work for us Remember, since we are working on a dataset that doesnt fit into we are working on high scale data, Therefore will be potentially billions of those summaries floating around across the cluster. During the map stage, any tile id could be generated by any of the mappers in the Spark cluster since locations are uniformly distributed in the dataset. This means that there needs to be a shuffle step in which these tile id mappers are assigned to the same reducer so that a correct aggregate can be calculated. The good news is that Spark handles all of this underneath the covers for us.
  • #29: * Don't usually have static data but data that is continually arriving. * In this case, we have an app using an API. > Spark can't run against a database > Spark works better with a small number of large files. > Ideally we'd like to map these incoming small activities to a set of large aggregate files. * Event Hub: giant cloud buffer to backend infrastructure. * Data automatically expires in the future - this is a feature * Helpful in 1) bursty situations and 2) don't need the data until future. > Then using Stream Analytics as a receiver on this Event Hub. > SA allows you to transform or summarize streams using a SQL like language > But we are just using it to land it. * Using these pieces of infrastructure in conjunction with each other to enable incremental ingestion of data is a key pattern for high scale data ingestion.
  • #30: Incremental ingestion leads us to our next pattern Which is processing this incremental data in slices. We process these slices individually in a manner analogous to how we processed the whole dataset in the previous slides. But since we are only processing an individual slice, we do not need to have nearly as large of a cluster to do the processing. We then load in the previous complete result, fold in this newly computed partial heatmap, and then write out the new heatmap. Although this adds a second step, overall we are operating on a much smaller set of data, and overall it is much more efficient.
  • #31: Many of you might have suspected it when you saw this the first time but there is a fairly large set of data that sent with each of these heatmap queries. If we made a traditional database query against a datastore for each of these XYZ summaries, youd likely end up with a solution that didnt scale well or scaled pretty expensively.
  • #32: Instead of querying for the heatmaps, we instead precomputed the heatmap elements that should be displayed within a particular view. We use a lower zoom level block, shown here, as a container for the higher zoom level elements, and then precompute what should be displayed. This allows us to not only turn a querying problem into a sending text problem from our web frontends. But also allows us to cache each of these resultsets in the browser. We store all of these resultsets in blob storage, which is 2c per GB a month, and very inexpensive compared to the comprable number of VMs that youd need to do this.
  • #33: So architecturally, this looks like this. We use the slice architecture that I talked about in a previous slide and use the deltas to determine which heatmaps require updates and to build the new heatmaps for each of these result sets and write these out to blob storage.
  • #34: One other thing that I glossed over when we described displaying activities. As you remember, the application has an elevation graph associated with the application.
  • #35: But, also remember, our input data set does not have elevation as part of it.
  • #36: This is another common pattern. Its nearly almost always the case that you need to transform your raw data by removing dirty data points and/or enriching it to include related data. In this case, we want to transform the raw data we have by adding elevation data that we are sourcing from Bing. and then output these enriched slices into their own blobs And then use this enriched data for our processing. We want to run this enrichment on the blobs as they are added to blob storage.
  • #37: The great news is that Azure has a nice new feature called Functions that does just this. Azure Functions allows you to setup a trigger on a wide variety of events Including queued events, http requests, service bus, timers, etc. It builds on top of our Azure App Services Web Jobs functionality To provide a super easy interface When one of these triggers is triggered, a function that you have written is executed. In this case, I wrote my function in JavaScript, but C# and a host of other languages are also supported. I also set up this Azure Function to trigger on a new blob being added to a blob in storage account container. So each time a blob is created by the previous incremental ingestion pattern ... It is passed into this function so that it can be enriched with elevation data. Note that the whole blob is passed into the function, so you need to make sure that the blobs for your scenario will fit in memory. Once the locations in the blob have been enriched, we then push these out to another blob... ... Which we use downstream.
  • #38: JSON is a fantastic format and very developer friendly. I used it in this project for clarity, since it results in output that is human readable. However, if you go to do a project like this for real, choose a data serialization format like Avro instead. By establishing a schema, and using a binary serialization format, you can achieve a 60% size reduction. This improves performance of deserializing and serializing the data from and to blob storage It also linearly cuts your data storage costs.
  • #39: Ok, thats what I have for you today Ive open sourced a couple of things as part of this presentation that you should have a look at if you are interested in more details Geotile Heatmap Includes a sample dataset that you are work against
  • #40: The other think I will share out are these slides via Twitter, so follow me @timpark if youd like to get a copy of those. And with that, thank you for coming out today for the talk, and Id be happy to take any questions