The original, possibly unreadable, rainbow themed presentation on thinking about how to aggregate data using a processing framework of your choice (MapReduce is the one I used) and Accumulo's Iterators.
2. Have you heard of...
TSAR, Summingbird? (Twitter)
Mesa? (Google)
Commutative Replicated Data Types?
Describe a system that pre-computes
aggregations over large datasets using
associative and/or commutative functions.
3. What do we need to pull this off?
We need data structures that can be combined
together. Numbers are a trivial example of this,
as we can combine two numbers using a
function (such as plus and multiply). There are
more advanced data structures such as
matrices, HyperLogLogPlus, StreamSummary
(used for top-k) and Bloom filters that also have
this property!
val partial: T = op(a, b)
4. What do we need to pull this off?
We need operations that can be performed in
parallel. Associative operations are espoused
by Twitter, but for our case operations that are
both associative and commutative have the
better property that we can get correct results
no matter what order we receive the data.
Common operations that are associative
(summation, set building) are also
commutative.
op(op(a, b), c) == op(a, op(b, c))
op(a, b) == op(b, a)
5. Wait a minute isn't that...
You caught me! It's a commutative monoid!
From Wolfram:
Monoid: A monoid is a set that is closed under
an associative binary operation and has an
identity element I in S such that for all a in S,
Ia=aI=a
Commutative Monoid: A monoid that is
commutative i.e., a monoid M such that for
every two elements a and b in M, ab=ba.
6. Put it to work
The example we're about to see uses
MapReduce and Accumulo. The same can be
accomplished using any processing framework
that supports map and reduce operations, such
as Spark or Storm's Trident interface.
7. We need two functions...
Map
Takes an input datum and turns into some
combinable structure
Like parsing strings to numbers, or creating single
element sets for combining
Reduce
Combines the merge-able data structures using our
associative and commutative function
8. Yup, that's all!
Map will be called on the input data once in a
Mapper instance.
Reduce will be called in a Combiner, Reducer
and an Accumulo Iterator!
The Accumulo Iterator is configured to run on
major compactions, minor compactions, and
scans
That's five places the same piece of code gets
run-- talk about modularity!
9. What does our Accumulo Iterator
look like?
We can re-use Accumulo's Combiner type here:
override def reduce:(key: Key, values: Iterator[Value])
Value = {
// deserialize and combine all intermediate
// values. This logic should be identical to
// what is in the mr.Combiner and Reducer
}
Our function has to be commutative because major
compactions will often pick smaller files to combine,
which means we only see discrete subsets of data in an
iterator invocation
10. Counting in practice (pt 1)
We've seen how to aggregate values together. What's the
best way to structure our data and query it?
Twitter's TSAR is a good starting point. It allows users to
declare what they want to aggregate:
Aggregate(
onKeys((origin, destination))
producing(Count))
This describes generating an edge between two cities and
calculating a weight for it.
11. Counting in practice (pt 2)
With that declaration, we can infer that the user wants their
operation to be summing over each instance of a given pairing,
so we can say the base value is 1 (sounds a bit like word
count, huh?). We need a key for each base value and partial
computation to be reduced with. For this simple pairing we can
have a schema like:
<field_1>0<value_1>0...<field_n>0<value_n> count:
[] <serialized long>
I recently traveled from Baltimore to Denver. Here's what that
trip would look like:
origin0bwi0destination0dia count: [] x01
12. Counting in practice (pt 3)
Iterator combines all values that are mapped to
the same key
We encoded the aggregation function into the
column family of the key
We can arbitrarily add new aggregate functions by
updating a mapping of column family to function
and then updating the iterator deployment
13. Something more than counting
Everybody counts, but what about something
like top-k?
The key schema isn't flexible enough to show a
relationship between two fields
We want to know the top-k relationship
between origin and destination cities
That column qualifier was looking awfully
blank. It'd be a shame if someone were to put
data in it...
14. How you like me now?
Aggregate(
onKeys((origin))
producing(TopK(destination)))
<field1>0<value1>0...<fieldN>0<valueN>
<op>: <relation> [] <serialized data structure>
Let's use my Baltimore->Denver trip as an
example:
origin0BWI topk: destination [] {DIA: 1}
15. But how do I query it?
This schema is really geared towards point
queries
Users would know exactly which dimensions
they were querying across to get an answer
BUENO What are the top-k destinations for Bill
when he leaves BWI?
NO BUENO What are all the dimensions and
aggregations I have for Bill?
16. Ruminate on this
Prepare functions
Preparing the input to do things like time bucketing and
normalization (Jared Winick's Trendulo)
Age off
Combining down to a single value means that value represents all
historical data. Maybe we don't care about that and would like to
age off data after a day/week/month/year. Mesa's batch Ids
could be of use here.
Security labels
Notice how I deftly avoided this topic. We should be able to
bucket aggregations based on visibility, but we need a way to
express the best way to handle this. Maybe just preserve the
input data's security labeling and attach it to the output of our
map function?
17. FIN
(hope this wasn't too hard to read)
Comments, suggestions or inflammatory messages should be
sent to @BillSlacum or wslacum@gmail.com