際際滷

際際滷Share a Scribd company logo
Guest Lecture Eindhoven University of Technology
                                      Notes on Data-Intensive Processing
                                                with Hadoop MapReduce
                                                                                                 Evert Lammerts
                                                                                                   May 30, 2012




Image source: http://valley-of-the-shmoon.blogspot.com/2011/04/pushing-elephant-up-stairs.html
To start with...

   About me

    Note on this lecture
       Adapted from Jimmy Lin's Cloud Computing course...
        http://www.umiacs.umd.edu/~jimmylin/cloud-2010-Spring/index.html
        and from Jimmy's slidedeck from the SIKS Big Data course and his talk at UvA
        http://www.umiacs.umd.edu/~jimmylin/
       Today's slides available at
        http://www.slideshare.net/evertlammerts

    About you
       Big Data?
       Cloud computing?
       Supercomputing?
       Hadoop and / or MapReduce?
The lecture

   Why Big Data?
   How Big Data?

   MapReduce
   Implementations
Why Big Data?




The Economist, Feb 25th 2010
1. Science

   The emergence of the 4th paradigm
       http://research.microsoft.com/en-us/collaboration/fourthparadigm/
       CERN stores 15 PB LHC data per year, a fraction of the actual produced
        data
       Square Kilometer Array expectation: 10 PB / hour




                        Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
2. Engineering

              Count and normalize




http://infrawatch.liacs.nl/




                               Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
3. Commerce

   Know thy customers
   Data  Insights  Competitive advantages
       Google was processing 20 PB each day... in 2008!
       FaceBook's collected 25 TB of HTTP logs each day... in 2009!
       eBay had ~9 PB of user data, and a growth rate of more than 50 TB /
        day in 2011




                       Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
IEEE Intelligent Systems, March/April 2009
s/knowledge/data/g




  Jimmy Lin, University of Maryland / Twitter, 2011
Also see

   P. Russom, Big Data Analytics, The Data Warehousing Institute, 2011
   James G. Kobielus, The Forrester Wave: Enterprise Hadoop
    Solutions, Forrester Research, 2012
   James Manyika et al., Big data: The next frontier for innovation,
    competition, and productivity, McKinsey Global Institute, 2011
   Dirk de Roos et al., Understanding Big Data: Analytics for Enterprise
    Class Hadoop and Streaming Data, IBM, 2011


    Etcetera
How Big Data?
Hadoop.mapreduce
Divide and Conquer



                           Work
                                                                    Partition


  w1                           w2                              w3

worker                    worker                     worker


  r1                            r2                             r3




                          Result                                  Combine




           Jimmy Lin, University of Maryland / Twitter, 2011
Amdahl's Law
Challenges in Parallel systems

   How do we divide the work into separate tasks?
   How do we get these tasks to our workers?
   What if we have more tasks than workers?
   What if our tasks need to exchange information?
   What if workers crash? (That's no exception!)
   How do we aggregate results?
Managing Parallel Applications

   A synchronization mechanism is needed
       To coordinate communication (like exchanging state) between workers
       To manage access to shared resources like data

   What if you don't?
       Mutual Exclusion
       Resource Starvation
       Race Conditions
       Dining philosophers, sleeping barber, cigarette smokers, readers-writers,
        producers-consumers, etcetera



                      Managing parallelism is hard!
Source: Ricardo Guimar達es Herrmann
Well known tools and patterns

   Programming models                                        Shared Memory                  Message Passing


        Shared memory (pthreads)




                                                                                   Memory
    


       Message passing (MPI)
   Design patterns                                         P1 P2 P3 P4 P5                   P1 P2 P3 P4 P5


       Master-slave
       Producer-consumer
       Shared queues

                        producer consumer
           master




                                                                                work queue

           slaves

                                         producer consumer




                            Jimmy Lin, University of Maryland / Twitter, 2011
From Von Neumann...




http://www.lrr.in.tum.de/~jasmin/neumann.html
 to a datacenter
Hadoop.mapreduce
Where to go from here

   The search for the right level of abstraction
       How do we build an architecture for a scaled environment?
       From HAL to DCAL

   Hiding parallel application management from the developer
       It's hard!

   Separating the what from the how
       The developer specifies the computation
       The runtime environment handles the execution




           Barosso, 2009
Ideas on scaling

   Scale out, don't scale up
       Hard upper-bound on the capacity of a single machine
       No upper-bound on the amount of machines you can buy (in theory)

   When dealing with large data...
       Prefer sequential reads over random reads
        & rather not store a trillion small files, but a million big ones
            Disk access is slow, but throughput is reasonable!
       Try to understand when a NAS / SAN architecture is really necessary
            It's expensive to scale!
MapReduce
An abstraction of typical large-data problems

(1) Iterate over a large number of records
(2) Extract something of interest from each
(3) Shuffle and sort intermediate results
(4) Aggregate intermediate results
(5) Generate final output
An abstraction of typical large-data problems

(1) Iterate over a large number of records
                                           M
(2) Extract something of interest from each A   P
(3) Shuffle and sort intermediate R
                                  results
                                  ED
(4) Aggregate intermediate results U
                                        C
(5) Generate final output                E




   MapReduce provides a functional abstraction of step 2 and step 4
Roots in functional programming

Map(S: array, f())
   Apply f(s  S) for all items in S


Fold(S: array, f())
   Recursively apply f() to each item in S and the result of the previous
    operation, or nil if such an operation does not exist




                                  Source: Wikipedia
MapReduce

The programmer specifies two functions:
   map(k, v)  <k', v'>*
   reduce(k', v'[ ])  <k', v'>*
       All values associated with the same key are sent to the same reducer


The execution framework handles everything else
k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6




 map                 map                    map                       map


a 1    b 2        c 3     c 6           a 5     c 2             b 7     c 8

      Shuffle and Sort: aggregate values by keys
             a    1 5              b    2 7              c    2 3 6 8




        reduce                reduce                reduce


          r1 s1                 r2 s2                 r3 s3




                  Jimmy Lin, University of Maryland / Twitter, 2011
MapReduce Hello World: WordCount

   Question: how can we count unique words in a given text?
       Line-based input (a record is one line)
       Key: position of first character in the whole document
       Value: a line not including the EOL character
       Input looks like:
           Key: 0,     value: a wise old owl lived in an oak
           Key: 31,    value: the more he saw the less he spoke
           Key: 63,    value: the less he spoke the more he heard
           Key: 99,    value: why can't we all be like that wise old bird
       Output looks like:
           (a,1)            (an,1)       (be,1)
           (he,4)           (in,1)       (we,1)
           (all,1)          (oak,1)      (old,2)
           (owl,1)          (saw,1)      (the,4)
           (why,1)          (bird,1)     (less,2)
           (like,1)         (more,2)     (that,1)
           (wise,2)         (can't,1)    (heard,1)
           (lived,1)        (spoke,2)
MapReduce Hello World: WordCount
MapReduce

The programmer specifies two functions:
   map(k, v)  <k', v'>*
   reduce(k', v'[ ])  <k', v'>*
       All values associated with the same key are sent to the same reducer


The execution framework handles ? everything else ?
MapReduce execution framework

   Handles scheduling
       Assigns map and reduce tasks to workers
       Handles data-awareness: moves processes to data
   Handles synchronization
       Gathers, sorts, and shuffles intermediate data
   Handles errors and faults
       Detects worker failures and restarts
   Handles communication with the distributed filesystem
MapReduce

The programmer specifies two functions:
   map (k, v)  <k', v'>*
   reduce (k', v'[ ])  <k', v'>*
        All values associated with the same key are sent to the same reducer


The execution framework handles everything else...
Not quite... usually, programmers also specify:
   partition (k', number of partitions)  partition for k'
       Often a simple hash of the key, e.g., hash(k') mod n
       Divides up key space for parallel reduce operations
   combine (k', v')  <k', v'>*
       Mini-reducers that run in memory after the map phase
       Used as optimization to reduce network traffic
k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6




  map                   map                   map                        map


a 1    b 2           c 3     c 6            a 5    c 2             b 7     c 8

 combine              combine                combine                 combine



a 1    b 2                 c 9              a 5    c 2             b 7     c 8

 partition             partition             partition               partition

      Shuffle and Sort: aggregate values by keys
               a     1 5              b     2 7             c     2 9 8
                                                                    3 6




         reduce                    reduce                reduce


             r1 s1                  r2 s2                 r3 s3




                     Jimmy Lin, University of Maryland / Twitter, 2011
Quick note...

The term MapReduce can refer to:
   The programming model
   The execution framework
   The specific implementation
Implementation(s)
MapReduce implementations

   Google (C++)
       Dean & Ghemawat, MapReduce: simplified data processing on large
        clusters, 2004
       Ghemawat, Gobioff, Leung, The Google File System, 2003
   Apache Hadoop (Java)
       Open source implementation
       Originally led by Yahoo!
       Broadly adopted
   Custom research implementations
       For GPU's, supercomputers, etcetera
User
                                                 Program

                                                     (1) submit


                                                 Master

                               (2) schedule map        (2) schedule reduce


                     worker
split 0
                                                                                  (6) write   output
split 1                                              (5) remote read    worker
          (3) read                                                                             file 0
split 2                        (4) local write
                     worker
split 3
split 4                                                                                       output
                                                                        worker
                                                                                               file 1

                     worker


Input                 Map             Intermediate files                 Reduce               Output
 files               phase              (on local disk)                   phase                files




                     Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
User
                                                 Program

                                                     (1) submit


                                                 Master

                               (2) schedule map        (2) schedule reduce


                     worker
split 0
                                                                                  (6) write   output
split 1                                              (5) remote read    worker
          (3) read                                                                             file 0
split 2                        (4) local write
                     worker
split 3
split 4                                                                                       output
                                                                        worker
                                                                                               file 1

                     worker


Input                 Map             Intermediate files                 Reduce               Output
 files               phase              (on local disk)                   phase                files




                     Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
User
                                                           Program

                                                               (1) submit


                                                           Master

                                         (2) schedule map        (2) schedule reduce


                               worker
          split 0
                                                                                            (6) write   output
          split 1                                              (5) remote read    worker
                    (3) read                                                                             file 0
          split 2                        (4) local write
                               worker
          split 3
          split 4                                                                                       output
                                                                                  worker
                                                                                                         file 1

                               worker


          Input                 Map             Intermediate files                 Reduce               Output
           files               phase              (on local disk)                   phase                files


How do we get our input data to the map()'s on the workers?



                               Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
Distributed File System

   Don't move data to the workers... move workers to the data!
       Store data on the local disks of nodes in the cluster
       Start up the work on the node that has the data local

   A distributed files system is the answer
       GFS (Google File System) for Google's MapReduce
       HDFS (Hadoop Distributed File System) for Hadoop
GFS: Design decisions

   Files stored as chunks
       Fixed size (64MB)
   Reliability through replication
       Each chunk replicated across 3+ chunkservers
   Single master to coordinate access, keep metadata
       Simple centralized management
   No data caching
       Little benefit due to large datasets, streaming reads
   Simplify the API
       Push some of the issues onto the client (e.g., data layout)


               HDFS = GFS clone (same basic ideas)

                         Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
From GFS to HDFS

   Terminology differences:
       GFS Master = Hadoop NameNode
       GFS Chunkservers = Hadoop DataNode
       Chunk = Block
   Functional differences
       File appends in HDFS is relatively new
       HDFS performance is (likely) slower
       Blocksize is configurable by the client




                      We use Hadoop terminology
HDFS Architecture


                                                          HDFS namenode

Application                                                                  /foo/bar
                  (file name, block id)
                                                  File namespace              block 3df2
HDFS Client
                (block id, block location)




                                                  instructions to datanode

                                                                 datanode state
              (block id, byte range)
                                                HDFS datanode                     HDFS datanode
              block data
                                                Linux file system                 Linux file system

                                                                                                 




                           Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
Namenode Responsibilities

   Managing the file system namespace:
       Holds file/directory structure, metadata, file-to-block mapping, access
        permissions, etcetera
   Coordinating file operations
       Directs clients to DataNodes for reads and writes
       No data is moved through the NameNode
   Maintaining overall health:
       Periodic communication with the DataNodes
       Block re-replication and rebalancing
       Garbage collection
Putting everything together



                     namenode                  job submission node


             namenode daemon                          jobtracker




   tasktracker                     tasktracker                        tasktracker

datanode daemon                 datanode daemon                   datanode daemon

 Linux file system               Linux file system                 Linux file system

                                                                                  
   slave node                      slave node                         slave node




                         Jimmy Lin, University of Maryland / Twitter, 2011
Questions?

More Related Content

Hadoop.mapreduce

  • 1. Guest Lecture Eindhoven University of Technology Notes on Data-Intensive Processing with Hadoop MapReduce Evert Lammerts May 30, 2012 Image source: http://valley-of-the-shmoon.blogspot.com/2011/04/pushing-elephant-up-stairs.html
  • 2. To start with... About me Note on this lecture Adapted from Jimmy Lin's Cloud Computing course... http://www.umiacs.umd.edu/~jimmylin/cloud-2010-Spring/index.html and from Jimmy's slidedeck from the SIKS Big Data course and his talk at UvA http://www.umiacs.umd.edu/~jimmylin/ Today's slides available at http://www.slideshare.net/evertlammerts About you Big Data? Cloud computing? Supercomputing? Hadoop and / or MapReduce?
  • 3. The lecture Why Big Data? How Big Data? MapReduce Implementations
  • 4. Why Big Data? The Economist, Feb 25th 2010
  • 5. 1. Science The emergence of the 4th paradigm http://research.microsoft.com/en-us/collaboration/fourthparadigm/ CERN stores 15 PB LHC data per year, a fraction of the actual produced data Square Kilometer Array expectation: 10 PB / hour Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
  • 6. 2. Engineering Count and normalize http://infrawatch.liacs.nl/ Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
  • 7. 3. Commerce Know thy customers Data Insights Competitive advantages Google was processing 20 PB each day... in 2008! FaceBook's collected 25 TB of HTTP logs each day... in 2009! eBay had ~9 PB of user data, and a growth rate of more than 50 TB / day in 2011 Adapted from (Jimmy Lin, University of Maryland / Twitter, 2011)
  • 8. IEEE Intelligent Systems, March/April 2009
  • 9. s/knowledge/data/g Jimmy Lin, University of Maryland / Twitter, 2011
  • 10. Also see P. Russom, Big Data Analytics, The Data Warehousing Institute, 2011 James G. Kobielus, The Forrester Wave: Enterprise Hadoop Solutions, Forrester Research, 2012 James Manyika et al., Big data: The next frontier for innovation, competition, and productivity, McKinsey Global Institute, 2011 Dirk de Roos et al., Understanding Big Data: Analytics for Enterprise Class Hadoop and Streaming Data, IBM, 2011 Etcetera
  • 13. Divide and Conquer Work Partition w1 w2 w3 worker worker worker r1 r2 r3 Result Combine Jimmy Lin, University of Maryland / Twitter, 2011
  • 15. Challenges in Parallel systems How do we divide the work into separate tasks? How do we get these tasks to our workers? What if we have more tasks than workers? What if our tasks need to exchange information? What if workers crash? (That's no exception!) How do we aggregate results?
  • 16. Managing Parallel Applications A synchronization mechanism is needed To coordinate communication (like exchanging state) between workers To manage access to shared resources like data What if you don't? Mutual Exclusion Resource Starvation Race Conditions Dining philosophers, sleeping barber, cigarette smokers, readers-writers, producers-consumers, etcetera Managing parallelism is hard!
  • 18. Well known tools and patterns Programming models Shared Memory Message Passing Shared memory (pthreads) Memory Message passing (MPI) Design patterns P1 P2 P3 P4 P5 P1 P2 P3 P4 P5 Master-slave Producer-consumer Shared queues producer consumer master work queue slaves producer consumer Jimmy Lin, University of Maryland / Twitter, 2011
  • 20. to a datacenter
  • 22. Where to go from here The search for the right level of abstraction How do we build an architecture for a scaled environment? From HAL to DCAL Hiding parallel application management from the developer It's hard! Separating the what from the how The developer specifies the computation The runtime environment handles the execution Barosso, 2009
  • 23. Ideas on scaling Scale out, don't scale up Hard upper-bound on the capacity of a single machine No upper-bound on the amount of machines you can buy (in theory) When dealing with large data... Prefer sequential reads over random reads & rather not store a trillion small files, but a million big ones Disk access is slow, but throughput is reasonable! Try to understand when a NAS / SAN architecture is really necessary It's expensive to scale!
  • 25. An abstraction of typical large-data problems (1) Iterate over a large number of records (2) Extract something of interest from each (3) Shuffle and sort intermediate results (4) Aggregate intermediate results (5) Generate final output
  • 26. An abstraction of typical large-data problems (1) Iterate over a large number of records M (2) Extract something of interest from each A P (3) Shuffle and sort intermediate R results ED (4) Aggregate intermediate results U C (5) Generate final output E MapReduce provides a functional abstraction of step 2 and step 4
  • 27. Roots in functional programming Map(S: array, f()) Apply f(s S) for all items in S Fold(S: array, f()) Recursively apply f() to each item in S and the result of the previous operation, or nil if such an operation does not exist Source: Wikipedia
  • 28. MapReduce The programmer specifies two functions: map(k, v) <k', v'>* reduce(k', v'[ ]) <k', v'>* All values associated with the same key are sent to the same reducer The execution framework handles everything else
  • 29. k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6 map map map map a 1 b 2 c 3 c 6 a 5 c 2 b 7 c 8 Shuffle and Sort: aggregate values by keys a 1 5 b 2 7 c 2 3 6 8 reduce reduce reduce r1 s1 r2 s2 r3 s3 Jimmy Lin, University of Maryland / Twitter, 2011
  • 30. MapReduce Hello World: WordCount Question: how can we count unique words in a given text? Line-based input (a record is one line) Key: position of first character in the whole document Value: a line not including the EOL character Input looks like: Key: 0, value: a wise old owl lived in an oak Key: 31, value: the more he saw the less he spoke Key: 63, value: the less he spoke the more he heard Key: 99, value: why can't we all be like that wise old bird Output looks like: (a,1) (an,1) (be,1) (he,4) (in,1) (we,1) (all,1) (oak,1) (old,2) (owl,1) (saw,1) (the,4) (why,1) (bird,1) (less,2) (like,1) (more,2) (that,1) (wise,2) (can't,1) (heard,1) (lived,1) (spoke,2)
  • 32. MapReduce The programmer specifies two functions: map(k, v) <k', v'>* reduce(k', v'[ ]) <k', v'>* All values associated with the same key are sent to the same reducer The execution framework handles ? everything else ?
  • 33. MapReduce execution framework Handles scheduling Assigns map and reduce tasks to workers Handles data-awareness: moves processes to data Handles synchronization Gathers, sorts, and shuffles intermediate data Handles errors and faults Detects worker failures and restarts Handles communication with the distributed filesystem
  • 34. MapReduce The programmer specifies two functions: map (k, v) <k', v'>* reduce (k', v'[ ]) <k', v'>* All values associated with the same key are sent to the same reducer The execution framework handles everything else... Not quite... usually, programmers also specify: partition (k', number of partitions) partition for k' Often a simple hash of the key, e.g., hash(k') mod n Divides up key space for parallel reduce operations combine (k', v') <k', v'>* Mini-reducers that run in memory after the map phase Used as optimization to reduce network traffic
  • 35. k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6 map map map map a 1 b 2 c 3 c 6 a 5 c 2 b 7 c 8 combine combine combine combine a 1 b 2 c 9 a 5 c 2 b 7 c 8 partition partition partition partition Shuffle and Sort: aggregate values by keys a 1 5 b 2 7 c 2 9 8 3 6 reduce reduce reduce r1 s1 r2 s2 r3 s3 Jimmy Lin, University of Maryland / Twitter, 2011
  • 36. Quick note... The term MapReduce can refer to: The programming model The execution framework The specific implementation
  • 38. MapReduce implementations Google (C++) Dean & Ghemawat, MapReduce: simplified data processing on large clusters, 2004 Ghemawat, Gobioff, Leung, The Google File System, 2003 Apache Hadoop (Java) Open source implementation Originally led by Yahoo! Broadly adopted Custom research implementations For GPU's, supercomputers, etcetera
  • 39. User Program (1) submit Master (2) schedule map (2) schedule reduce worker split 0 (6) write output split 1 (5) remote read worker (3) read file 0 split 2 (4) local write worker split 3 split 4 output worker file 1 worker Input Map Intermediate files Reduce Output files phase (on local disk) phase files Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
  • 40. User Program (1) submit Master (2) schedule map (2) schedule reduce worker split 0 (6) write output split 1 (5) remote read worker (3) read file 0 split 2 (4) local write worker split 3 split 4 output worker file 1 worker Input Map Intermediate files Reduce Output files phase (on local disk) phase files Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
  • 41. User Program (1) submit Master (2) schedule map (2) schedule reduce worker split 0 (6) write output split 1 (5) remote read worker (3) read file 0 split 2 (4) local write worker split 3 split 4 output worker file 1 worker Input Map Intermediate files Reduce Output files phase (on local disk) phase files How do we get our input data to the map()'s on the workers? Jimmy Lin, Adapted from (Dean and Ghemawat, OSDI 2004)
  • 42. Distributed File System Don't move data to the workers... move workers to the data! Store data on the local disks of nodes in the cluster Start up the work on the node that has the data local A distributed files system is the answer GFS (Google File System) for Google's MapReduce HDFS (Hadoop Distributed File System) for Hadoop
  • 43. GFS: Design decisions Files stored as chunks Fixed size (64MB) Reliability through replication Each chunk replicated across 3+ chunkservers Single master to coordinate access, keep metadata Simple centralized management No data caching Little benefit due to large datasets, streaming reads Simplify the API Push some of the issues onto the client (e.g., data layout) HDFS = GFS clone (same basic ideas) Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
  • 44. From GFS to HDFS Terminology differences: GFS Master = Hadoop NameNode GFS Chunkservers = Hadoop DataNode Chunk = Block Functional differences File appends in HDFS is relatively new HDFS performance is (likely) slower Blocksize is configurable by the client We use Hadoop terminology
  • 45. HDFS Architecture HDFS namenode Application /foo/bar (file name, block id) File namespace block 3df2 HDFS Client (block id, block location) instructions to datanode datanode state (block id, byte range) HDFS datanode HDFS datanode block data Linux file system Linux file system Jimmy Lin, Adapted from (Ghemawat, SOSP 2003)
  • 46. Namenode Responsibilities Managing the file system namespace: Holds file/directory structure, metadata, file-to-block mapping, access permissions, etcetera Coordinating file operations Directs clients to DataNodes for reads and writes No data is moved through the NameNode Maintaining overall health: Periodic communication with the DataNodes Block re-replication and rebalancing Garbage collection
  • 47. Putting everything together namenode job submission node namenode daemon jobtracker tasktracker tasktracker tasktracker datanode daemon datanode daemon datanode daemon Linux file system Linux file system Linux file system slave node slave node slave node Jimmy Lin, University of Maryland / Twitter, 2011