際際滷

際際滷Share a Scribd company logo
Let's Aggregate 
@BillSlacum 
Accumulo Meetup, Sep 23, 2014
Have you heard of... 
 TSAR, Summingbird? (Twitter) 
 Mesa? (Google) 
 Commutative Replicated Data Types? 
Describe a system that pre-computes 
aggregations over large datasets using 
associative and/or commutative functions.
What do we need to pull this off? 
We need data structures that can be combined 
together. Numbers are a trivial example of this, 
as we can combine two numbers using a 
function (such as plus and multiply). There are 
more advanced data structures such as 
matrices, HyperLogLogPlus, StreamSummary 
(used for top-k) and Bloom filters that also have 
this property! 
val partial: T = op(a, b)
What do we need to pull this off? 
We need operations that can be performed in 
parallel. Associative operations are espoused 
by Twitter, but for our case operations that are 
both associative and commutative have the 
better property that we can get correct results 
no matter what order we receive the data. 
Common operations that are associative 
(summation, set building) are also 
commutative. 
op(op(a, b), c) == op(a, op(b, c)) 
op(a, b) == op(b, a)
Wait a minute isn't that... 
You caught me! It's a commutative monoid! 
From Wolfram: 
Monoid: A monoid is a set that is closed under 
an associative binary operation and has an 
identity element I in S such that for all a in S, 
Ia=aI=a 
Commutative Monoid: A monoid that is 
commutative i.e., a monoid M such that for 
every two elements a and b in M, ab=ba.
Put it to work 
The example we're about to see uses 
MapReduce and Accumulo. The same can be 
accomplished using any processing framework 
that supports map and reduce operations, such 
as Spark or Storm's Trident interface.
We need two functions... 
Map 
 Takes an input datum and turns into some 
combinable structure 
 Like parsing strings to numbers, or creating single 
element sets for combining 
Reduce 
 Combines the merge-able data structures using our 
associative and commutative function
Yup, that's all! 
 Map will be called on the input data once in a 
Mapper instance. 
 Reduce will be called in a Combiner, Reducer 
and an Accumulo Iterator! 
 The Accumulo Iterator is configured to run on 
major compactions, minor compactions, and 
scans 
 That's five places the same piece of code gets 
run-- talk about modularity!
What does our Accumulo Iterator 
look like? 
 We can re-use Accumulo's Combiner type here: 
override def reduce:(key: Key, values: Iterator[Value]) 
Value = { 
// deserialize and combine all intermediate 
// values. This logic should be identical to 
// what is in the mr.Combiner and Reducer 
} 
 Our function has to be commutative because major 
compactions will often pick smaller files to combine, 
which means we only see discrete subsets of data in an 
iterator invocation
Counting in practice (pt 1) 
We've seen how to aggregate values together. What's the 
best way to structure our data and query it? 
Twitter's TSAR is a good starting point. It allows users to 
declare what they want to aggregate: 
Aggregate( 
onKeys((origin, destination)) 
producing(Count)) 
This describes generating an edge between two cities and 
calculating a weight for it.
Counting in practice (pt 2) 
With that declaration, we can infer that the user wants their 
operation to be summing over each instance of a given pairing, 
so we can say the base value is 1 (sounds a bit like word 
count, huh?). We need a key for each base value and partial 
computation to be reduced with. For this simple pairing we can 
have a schema like: 
<field_1>0<value_1>0...<field_n>0<value_n> count: 
 [] <serialized long> 
I recently traveled from Baltimore to Denver. Here's what that 
trip would look like: 
origin0bwi0destination0dia count:  [] x01
Counting in practice (pt 3) 
 Iterator combines all values that are mapped to 
the same key 
 We encoded the aggregation function into the 
column family of the key 
 We can arbitrarily add new aggregate functions by 
updating a mapping of column family to function 
and then updating the iterator deployment
Something more than counting 
 Everybody counts, but what about something 
like top-k? 
 The key schema isn't flexible enough to show a 
relationship between two fields 
 We want to know the top-k relationship 
between origin and destination cities 
 That column qualifier was looking awfully 
blank. It'd be a shame if someone were to put 
data in it...
How you like me now? 
 Aggregate( 
onKeys((origin)) 
producing(TopK(destination))) 
 <field1>0<value1>0...<fieldN>0<valueN> 
<op>: <relation> [] <serialized data structure> 
 Let's use my Baltimore->Denver trip as an 
example: 
origin0BWI topk: destination [] {DIA: 1}
But how do I query it? 
 This schema is really geared towards point 
queries 
 Users would know exactly which dimensions 
they were querying across to get an answer 
 BUENO What are the top-k destinations for Bill 
when he leaves BWI? 
 NO BUENO What are all the dimensions and 
aggregations I have for Bill?
Ruminate on this 
 Prepare functions 
 Preparing the input to do things like time bucketing and 
normalization (Jared Winick's Trendulo) 
 Age off 
 Combining down to a single value means that value represents all 
historical data. Maybe we don't care about that and would like to 
age off data after a day/week/month/year. Mesa's batch Ids 
could be of use here. 
 Security labels 
 Notice how I deftly avoided this topic. We should be able to 
bucket aggregations based on visibility, but we need a way to 
express the best way to handle this. Maybe just preserve the 
input data's security labeling and attach it to the output of our 
map function?
FIN 
(hope this wasn't too hard to read) 
Comments, suggestions or inflammatory messages should be 
sent to @BillSlacum or wslacum@gmail.com

More Related Content

Aggregating In Accumulo

  • 1. Let's Aggregate @BillSlacum Accumulo Meetup, Sep 23, 2014
  • 2. Have you heard of... TSAR, Summingbird? (Twitter) Mesa? (Google) Commutative Replicated Data Types? Describe a system that pre-computes aggregations over large datasets using associative and/or commutative functions.
  • 3. What do we need to pull this off? We need data structures that can be combined together. Numbers are a trivial example of this, as we can combine two numbers using a function (such as plus and multiply). There are more advanced data structures such as matrices, HyperLogLogPlus, StreamSummary (used for top-k) and Bloom filters that also have this property! val partial: T = op(a, b)
  • 4. What do we need to pull this off? We need operations that can be performed in parallel. Associative operations are espoused by Twitter, but for our case operations that are both associative and commutative have the better property that we can get correct results no matter what order we receive the data. Common operations that are associative (summation, set building) are also commutative. op(op(a, b), c) == op(a, op(b, c)) op(a, b) == op(b, a)
  • 5. Wait a minute isn't that... You caught me! It's a commutative monoid! From Wolfram: Monoid: A monoid is a set that is closed under an associative binary operation and has an identity element I in S such that for all a in S, Ia=aI=a Commutative Monoid: A monoid that is commutative i.e., a monoid M such that for every two elements a and b in M, ab=ba.
  • 6. Put it to work The example we're about to see uses MapReduce and Accumulo. The same can be accomplished using any processing framework that supports map and reduce operations, such as Spark or Storm's Trident interface.
  • 7. We need two functions... Map Takes an input datum and turns into some combinable structure Like parsing strings to numbers, or creating single element sets for combining Reduce Combines the merge-able data structures using our associative and commutative function
  • 8. Yup, that's all! Map will be called on the input data once in a Mapper instance. Reduce will be called in a Combiner, Reducer and an Accumulo Iterator! The Accumulo Iterator is configured to run on major compactions, minor compactions, and scans That's five places the same piece of code gets run-- talk about modularity!
  • 9. What does our Accumulo Iterator look like? We can re-use Accumulo's Combiner type here: override def reduce:(key: Key, values: Iterator[Value]) Value = { // deserialize and combine all intermediate // values. This logic should be identical to // what is in the mr.Combiner and Reducer } Our function has to be commutative because major compactions will often pick smaller files to combine, which means we only see discrete subsets of data in an iterator invocation
  • 10. Counting in practice (pt 1) We've seen how to aggregate values together. What's the best way to structure our data and query it? Twitter's TSAR is a good starting point. It allows users to declare what they want to aggregate: Aggregate( onKeys((origin, destination)) producing(Count)) This describes generating an edge between two cities and calculating a weight for it.
  • 11. Counting in practice (pt 2) With that declaration, we can infer that the user wants their operation to be summing over each instance of a given pairing, so we can say the base value is 1 (sounds a bit like word count, huh?). We need a key for each base value and partial computation to be reduced with. For this simple pairing we can have a schema like: <field_1>0<value_1>0...<field_n>0<value_n> count: [] <serialized long> I recently traveled from Baltimore to Denver. Here's what that trip would look like: origin0bwi0destination0dia count: [] x01
  • 12. Counting in practice (pt 3) Iterator combines all values that are mapped to the same key We encoded the aggregation function into the column family of the key We can arbitrarily add new aggregate functions by updating a mapping of column family to function and then updating the iterator deployment
  • 13. Something more than counting Everybody counts, but what about something like top-k? The key schema isn't flexible enough to show a relationship between two fields We want to know the top-k relationship between origin and destination cities That column qualifier was looking awfully blank. It'd be a shame if someone were to put data in it...
  • 14. How you like me now? Aggregate( onKeys((origin)) producing(TopK(destination))) <field1>0<value1>0...<fieldN>0<valueN> <op>: <relation> [] <serialized data structure> Let's use my Baltimore->Denver trip as an example: origin0BWI topk: destination [] {DIA: 1}
  • 15. But how do I query it? This schema is really geared towards point queries Users would know exactly which dimensions they were querying across to get an answer BUENO What are the top-k destinations for Bill when he leaves BWI? NO BUENO What are all the dimensions and aggregations I have for Bill?
  • 16. Ruminate on this Prepare functions Preparing the input to do things like time bucketing and normalization (Jared Winick's Trendulo) Age off Combining down to a single value means that value represents all historical data. Maybe we don't care about that and would like to age off data after a day/week/month/year. Mesa's batch Ids could be of use here. Security labels Notice how I deftly avoided this topic. We should be able to bucket aggregations based on visibility, but we need a way to express the best way to handle this. Maybe just preserve the input data's security labeling and attach it to the output of our map function?
  • 17. FIN (hope this wasn't too hard to read) Comments, suggestions or inflammatory messages should be sent to @BillSlacum or wslacum@gmail.com