A data workflow management system

Harel Ben Attia
Senior Software Engineer
 Tens of Billions of Recommendations per month
 Most major publishers in the World
 Hundreds GBs of new data every day
 Data Processing Workflows

 Multiple Types of Processing
   Rollups, Grouping, Filtering, Algorithm

 Multiple Stages of Processing
   Using the output of other processes as input
 Dependency Management
   Hardcoded into code/scripts
   Time-based using cron or another scheduler

 Logic is scattered around the system
   Developers need to take care of
    monitoring, alerts, permissions etc.
   Multiple Locations of Execution

Data Processing
 Execution Management
      Full Execution History and Filtering
      Monitoring and Actionable Alerting      Ops / NOC
      Automatic Retries
      Web UI

 Ease of Development
    Declarative Data Processing Definitions
    Decentralized                             Developers
         Shared Data, separate development

 Data Driven Dependencies
Other Approaches

      A                B           C
         Option 1              Option 2

A          B C         t   A   J     B    C
Other Approaches

                   Option 2

   A J B C                    t
Other Approaches

               D Fails
D sends email

Developer of D
still works here

        Where is the code?
Other Approaches

                     2am is a
D=                   great hour for
       Data from C is missing

C=                   The data of C
                     is all there!
Other Approaches

                               X:37 seems like a
                             good time C never
                              finished after X:30

A B C                                               t

Job J has been working for
 more than a week before
        the incident
Other Approaches

Need to rerun processes B, C and D

Which hours failed?

How to run all of them for the specific hours?
  Without running A again?
  Without colliding with ongoing executions?
Other Approaches
                       A will never take more
            than 15 minutes, so X:20 is more than enough


A WILL eventually take longer
 Execution Management
      Full Execution History + Filtering and Searching
      Monitoring and Actionable Alerting
      Automatic Retries
      Web UI

 Ease of Development
    Declarative Data Processing Definitions
         Shared Data, separate development

 Data Driven Dependencies
                 Robustness              Reliability      Parallelism

What?            When?

Where?           How?
Execution Layer  the What
       Every data processing task is called a Job

           A Job can contain multiple Steps

   Importing from MySQL to Hive
   Hive Queries
   JDBC Queries
   Transfer data from Hive into MySQL and to Cassandra
   Running External Commands:
    MapReduce, Java, bash, Legacy code, etc.

                  Jobs use Parameters
Scheduling Layer  the When
        Each job registers to an event, which will trigger its execution

                 Each job emits an event at job completion

Events that describe Data Availability            Events that are time dependent
The How and the Where
             Both handled by the infrastructure

 Integration to other systems
    Connecting to Hive/Hadoop/Cassandra        Logical names to
                                                 all data sources
    Connecting to JDBC Databases
    Retries, throttling, timeouts    productionCassandra

 Monitoring and Alerts               Centralized Management, email
                                       notifications and dashboards

 Location of Execution              Actual location is hidden from the
River UI

           FailDownload JobLog
               Job and Dependents
                  Restart Job
Monitoring Dashboard
Monitoring Dashboard

Copy Data From JDBC to Hive
sourceDB = productionDatabase
sourceTable = myRawData
targetCluster = onlineHadoopCluster
targetHiveTable = rawDataTable
Filter = date=#handledDate#

     Steps only contain what needs to be done
A bit more about triggers
            Triggers have parameters as well

          Date=2012-10-10,hour=15              Date=2012-10-10,hour=19

Parameters Propagate through jobs and to other triggers
Developers Point-of-View

Trigger Queue                Execution Queue

                      Trigger               Execution       Spring
                     Manager                Manager         Batch
Topology                                                               Spring Batch DB

            Hive/Hadoop            OS       Cassandra        JDBC
              Interface         Interface    Inerface      Interface

for detailed example
Trigger Queue                               Execution Queue
      Date=2012-01-02   T1                           T2
                                                     T3         Job1,Job2
          hour=03                                         Date=2012-01-02
                        River                 T1
                                              T2                                   Job3

                                   Trigger                         Execution             Spring
                                  Manager                          Manager               Batch
Topology                                                                                                 Spring Batch DB
                                       (from Job1)                                       (from Job2)

                         Hive/Hadoop               OS              Cassandra              JDBC
                           Interface           Interface            Inerface            Interface

Success Example
                          Trigger Queue                      Execution Queue                             Job2
                                           T3         Job3                              Job2
                                           Date=2012-01-02                      Date=2012-01-02
                                               hour=03                              hour=03
                  River             T3                               Job2

                             Trigger                     Execution           Spring
                            Manager                      Manager             Batch
Topology                                                                                          Spring Batch DB

                   Hive/Hadoop            OS             Cassandra            JDBC
                     Interface         Interface          Inerface          Interface

Failure Example
Notable Features
 Parameter Enrichment
    Example: #beginningOfMonth

 Precondition Expressions
    Example: isLastDayOfMonth(#handleDate)

 Data Comparison Capabilities
    Data Validations
    Supports Tolerance
       Absolute and Percentage margins

 Command Line and Java Clients
River at
   6 River Instances Running
   5 Teams
   ~4100 Jobs running every day
   ~50 Different Job Types

 Job Failures due to environment issues have
  almost no overhead
 Automatic restarts of jobs when data arrives late
Illustration by Chris Whetzel

                 Future Plans
   Multiple Dependencies
   Offline Job Testing Capabilities
   Improved DSL for Job Definitions
   Support for Master/Worker River machines
   Job Priorities
   Analysis Tools

    Outbrain is working on Open Sourcing River
Thank You

Harel Ben Attia                    @harelba on Twitter
harel@outbrain.com   http://www.linkedin.com/in/harelba

Outbrain River Presentation at Reversim Summit 2013

  • 1. River A data workflow management system Harel Ben Attia Senior Software Engineer
  • 2. Tens of Billions of Recommendations per month Most major publishers in the World Hundreds GBs of new data every day
  • 3. Context Data Processing Workflows Multiple Types of Processing Rollups, Grouping, Filtering, Algorithm Calculations Multiple Stages of Processing Using the output of other processes as input
  • 4. Problems Dependency Management Hardcoded into code/scripts Time-based using cron or another scheduler Logic is scattered around the system Developers need to take care of monitoring, alerts, permissions etc. Multiple Locations of Execution
  • 6. River Execution Management Full Execution History and Filtering Monitoring and Actionable Alerting Ops / NOC Automatic Retries Web UI Ease of Development Declarative Data Processing Definitions Decentralized Developers Shared Data, separate development JobLogs Data Driven Dependencies Why?
  • 7. Other Approaches A B C Option 1 Option 2 A B C t A J B C J
  • 8. Other Approaches Option 2 A J B C t
  • 9. Other Approaches D Fails D sends email Developer of D still works here Where is the code?
  • 10. Other Approaches 2am is a D= great hour for troubleshooting! Data from C is missing C= The data of C is all there!
  • 11. Other Approaches X:37 seems like a good time C never finished after X:30 anyway A B C t Job J has been working for more than a week before the incident D
  • 12. Other Approaches Need to rerun processes B, C and D Which hours failed? How to run all of them for the specific hours? Without running A again? Without colliding with ongoing executions?
  • 13. Other Approaches A will never take more than 15 minutes, so X:20 is more than enough A X:00 t J A WILL eventually take longer
  • 14. River Execution Management Full Execution History + Filtering and Searching Monitoring and Actionable Alerting Automatic Retries Web UI JobLogs Ease of Development Declarative Data Processing Definitions Decentralized Shared Data, separate development Data Driven Dependencies Why? Robustness Reliability Parallelism
  • 15. River What? When? Where? How?
  • 16. Execution Layer the What Every data processing task is called a Job A Job can contain multiple Steps Importing from MySQL to Hive Hive Queries JDBC Queries Transfer data from Hive into MySQL and to Cassandra Running External Commands: MapReduce, Java, bash, Legacy code, etc. Jobs use Parameters
  • 17. Scheduling Layer the When Each job registers to an event, which will trigger its execution Each job emits an event at job completion Events that describe Data Availability Events that are time dependent
  • 18. The How and the Where Both handled by the infrastructure Integration to other systems Connecting to Hive/Hadoop/Cassandra Logical names to all data sources Connecting to JDBC Databases readOnlyDataWarehouse Retries, throttling, timeouts productionCassandra Monitoring and Alerts Centralized Management, email notifications and dashboards Location of Execution Actual location is hidden from the developer/ops
  • 19. River UI FailDownload JobLog Job and Dependents Restart Job
  • 22. Steps Copy Data From JDBC to Hive sourceDB = productionDatabase sourceTable = myRawData targetCluster = onlineHadoopCluster targetHiveTable = rawDataTable Filter = date=#handledDate# Steps only contain what needs to be done
  • 23. A bit more about triggers Triggers have parameters as well Date=2012-10-10,hour=15 Date=2012-10-10,hour=19 Parameters Propagate through jobs and to other triggers
  • 24. Developers Point-of-View Automatic Retries Parameters Pass-through
  • 25. Trigger Queue Execution Queue River Trigger Execution Spring Manager Manager Batch Topology Spring Batch DB Hive/Hadoop OS Cassandra JDBC Interface Interface Inerface Interface External Systems
  • 27. Trigger Queue Execution Queue Date=2012-01-02 T1 T2 T3 Job1,Job2 Job3 hour=03 Date=2012-01-02 Date=2012-01-02 hour=03 hour=03 Job1 Job2 River T1 T3 T2 Job3 Job1,Job2 Trigger Execution Spring Manager Manager Batch Job1,Job2 Job3 Topology Spring Batch DB (from Job1) (from Job2) Hive/Hadoop OS Cassandra JDBC Interface Interface Inerface Interface External Systems Success Example
  • 28. UI Trigger Queue Execution Queue Job2 T3 Job3 Job2 Date=2012-01-02 Date=2012-01-02 hour=03 hour=03 Job2 Job2 River T3 Job2 Job3 Trigger Execution Spring Manager Manager Batch Job3 Topology Spring Batch DB Hive/Hadoop OS Cassandra JDBC Interface Interface Inerface Interface External Systems Failure Example
  • 29. Notable Features Parameter Enrichment Example: #beginningOfMonth Precondition Expressions Example: isLastDayOfMonth(#handleDate) Data Comparison Capabilities Data Validations Supports Tolerance Absolute and Percentage margins Command Line and Java Clients
  • 30. River at 6 River Instances Running 5 Teams ~4100 Jobs running every day ~50 Different Job Types Job Failures due to environment issues have almost no overhead Automatic restarts of jobs when data arrives late
  • 31. Illustration by Chris Whetzel Future Plans Multiple Dependencies Offline Job Testing Capabilities Improved DSL for Job Definitions Support for Master/Worker River machines Job Priorities Analysis Tools Outbrain is working on Open Sourcing River
  • 33. Thank You Harel Ben Attia @harelba on Twitter harel@outbrain.com http://www.linkedin.com/in/harelba

