Rationale
Distributed systems like GoogleBig data debugging
Big data analytics is the process of examining large data sets to uncover hidden patterns, unknown correlations, market trends, customer preferences and other useful business information. They applyChallenges in big data debugging
Massive scale
According to an EMC/IDC study: * 2.8ZB of data were created and replicated in 2012, * the digital universe will double every two years between now and 2020, and * there will be approximately 5.2TB of data for every person in 2020. Working with this scale of data has become very challenging.Unstructured data
Proposed solution
Data provenance or data lineage can be used to make the debugging of big data pipeline easier. This necessitates the collection of data about data transformations. The below section will explain data provenance in more detail.Data provenance
Data provenance provides a historical record of the data and its origins. The provenance of data which is generated by complex transformations such as workflows is of considerable value to scientists. From it, one can ascertain the quality of the data based on its ancestral data and derivations, track back sources of errors, allow automated re-enactment of derivations to update a data, and provide attribution of data sources. Provenance is also essential to the business domain where it can be used to drill down to the source of data in aLineage capture
Intuitively, for an operator T producing output o, lineage consists of triplets of form , where I is the set of inputs to T used to derive o. Capturing lineage for each operator T in a dataflow enables users to ask questions such as “Which outputs were produced by an input i on operator T ?” and “Which inputs produced output o in operator T ?” A query that finds the inputs deriving an output is called a backward tracing query, while one that finds the outputs produced by an input is called a forward tracing query.Robert Ikeda, Hyunjung Park, and Jennifer Widom. Provenance for generalized map and reduce workflows. In Proc. of CIDR, January 2011. Backward tracing is useful for debugging, while forward tracing is useful for tracking error propagation. Tracing queries also form the basis for replaying an original dataflow. However, to efficiently use lineage in a DISC system, we need to be able to capture lineage at multiple levels (or granularities) of operators and data, capture accurate lineage for DISC processing constructs and be able to trace through multiple dataflow stages efficiently. DISC system consists of several levels of operators and data, and different use cases of lineage can dictate the level at which lineage needs to be captured. Lineage can be captured at the level of the job, using files and giving lineage tuples of form , lineage can also be captured at the level of each task, using records and giving, for example, lineage tuples of form . The first form of lineage is called coarse-grain lineage, while the second form is called fine-grain lineage. Integrating lineage across different granularities enables users to ask questions such as “Which file read by a MapReduce job produced this particular output record?” and can be useful in debugging across different operator and data granularities within a dataflow. To capture end-to-end lineage in a DISC system, we use the Ibis model, which introduces the notion of containment hierarchies for operators and data. Specifically, Ibis proposes that an operator can be contained within another and such a relationship between two operators is called operator containment. "Operator containment implies that the contained (or child) operator performs a part of the logical operation of the containing (or parent) operator." For example, a MapReduce task is contained in a job. Similar containment relationships exist for data as well, called data containment. Data containment implies that the contained data is a subset of the containing data (superset).Prescriptive data lineage
The concept of prescriptive data lineage combines both the logical model (entity) of how that data should flow with the actual lineage for that instance. Data lineage and provenance typically refers to the way or the steps a dataset took to reach its current state of Data lineage, as well as all copies or derivatives. However, simply looking back at the audit or log correlations to determine the lineage from a forensic point of view fails for certain data management cases. For instance, it is impossible to determine with certainty if the route a data workflow took was correct or in compliance without the logic model. Only by combining a logical model with atomic forensic events can proper activities be validated: # Authorized copies, joins, or CTAS operations # Mapping of processing to the systems that those process are run on # Ad-Hoc versus established processing sequences Many certified compliance reports require provenance of data flow as well as the end state data for a specific instance. With these types of situations, any deviation from the prescribed path need to be accounted and potentially remediated. This marks a shift from purely "looking back" to a framework, which is better suited to capture compliance workflows.Active versus lazy lineage
Lazy lineage collection typically captures only coarse-grain lineage at run time. These systems incur low capture overheads due to the small amount of lineage they capture. However, to answer fine-grain tracing queries, they must replay the data flow on all (or a large part) of its input and collect fine-grain lineage during the replay. This approach is suitable for forensic systems, where a user wants to debug an observed bad output. Active collection systems capture entire lineage of the data flow at run time. The kind of lineage they capture may be coarse-grain or fine-grain, but they do not require any further computations on the data flow after its execution. Active fine-grain lineage collection systems incur higher capture overheads than lazy collection systems. However, they enable sophisticated replay and debugging.Actors
An actor is an entity that transforms data; it may be a Dryad vertex, individual map and reduce operators, a MapReduce job, or an entire dataflow pipeline. Actors act as black-boxes and the inputs and outputs of an actor are tapped to capture lineage in the form of associations, where an association is a triplet that relates an input i with an output o for an actor T . The instrumentation thus captures lineage in a dataflow one actor at a time, piecing it into a set of associations for each actor. The system developer needs to capture the data an actor reads (from other actors) and the data an actor writes (to other actors). For example, a developer can treat the Hadoop Job Tracker as an actor by recording the set of files read and written by each job. Dionysios Logothetis, Soumyarupa De, and Kenneth Yocum. 2013. Scalable lineage capture for debugging DISC analytics. In Proceedings of the 4th annual Symposium on Cloud Computing (SOCC '13). ACM, New York, NY, USA, Article 17, 15 pages.Associations
Association is a combination of the inputs, outputs and the operation itself. The operation is represented in terms of a black box also known as the actor. The associations describe the transformations that are applied on the data. The associations are stored in the association tables. Each unique actor is represented by its own association table. An association itself looks like where i is the set of inputs to the actor T and o is set of outputs given produced by the actor. Associations are the basic units of Data Lineage. Individual associations are later clubbed together to construct the entire history of transformations that were applied to the data.Architecture
Big data systems scale horizontally i.e. increase capacity by adding new hardware or software entities into the distributed system. The distributed system acts as a single entity in the logical level even though it comprises multiple hardware and software entities. The system should continue to maintain this property after horizontal scaling. An important advantage of horizontal scalability is that it can provide the ability to increase capacity on the fly. The biggest plus point is that horizontal scaling can be done using commodity hardware. The horizontal scaling feature of Big Data systems should be taken into account while creating the architecture of lineage store. This is essential because the lineage store itself should also be able to scale in parallel with the Big data system. The number of associations and amount of storage required to store lineage will increase with the increase in size and capacity of the system. The architecture of Big data systems makes the use of a single lineage store not appropriate and impossible to scale. The immediate solution to this problem is to distribute the lineage store itself. The best-case scenario is to use a local lineage store for every machine in the distributed system network. This allows the lineage store also to scale horizontally. In this design, the lineage of data transformations applied to the data on a particular machine is stored on the local lineage store of that specific machine. The lineage store typically stores association tables. Each actor is represented by its own association table. The rows are the associations themselves and columns represent inputs and outputs. This design solves 2 problems. It allows horizontal scaling of the lineage store. If a single centralized lineage store was used, then this information had to be carried over the network, which would cause additional network latency. The network latency is also avoided by the use of a distributed lineage store.Data flow reconstruction
The information stored in terms of associations needs to be combined by some means to get the data flow of a particular job. In a distributed system a job is broken down into multiple tasks. One or more instances run a particular task. The results produced on these individual machines are later combined together to finish the job. Tasks running on different machines perform multiple transformations on the data in the machine. All the transformations applied to the data on a machines is stored in the local lineage store of that machines. This information needs to be combined together to get the lineage of the entire job. The lineage of the entire job should help the data scientist understand the data flow of the job and he/she can use the data flow to debug the big data pipeline. The data flow is reconstructed in 3 stages.Association tables
The first stage of the data flow reconstruction is the computation of the association tables. The association tables exists for each actor in each local lineage store. The entire association table for an actor can be computed by combining these individual association tables. This is generally done using a series of equality joins based on the actors themselves. In few scenarios the tables might also be joined using inputs as the key. Indexes can also be used to improve the efficiency of a join. The joined tables need to be stored on a single instance or a machine to further continue processing. There are multiple schemes that are used to pick a machine where a join would be computed. The easiest one being the one with minimum CPU load. Space constraints should also be kept in mind while picking the instance where join would happen.Association graph
The second step in data flow reconstruction is computing an association graph from the lineage information. The graph represents the steps in the data flow. The actors act as vertices and the associations act as edges. Each actor T is linked to its upstream and downstream actors in the data flow. An upstream actor of T is one that produced the input of T, while a downstream actor is one that consumes the output of T . Containment relationships are always considered while creating the links. The graph consists of three types of links or edges.Explicitly specified links
The simplest link is an explicitly specified link between two actors. These links are explicitly specified in the code of a machine learning algorithm. When an actor is aware of its exact upstream or downstream actor, it can communicate this information to lineage API. This information is later used to link these actors during the tracing query. For example, in theLogically inferred links
Developers can attach data flowImplicit links through data set sharing
In distributed systems, sometimes there are implicit links, which are not specified during execution. For example, an implicit link exists between an actor that wrote to a file and another actor that read from it. Such links connect actors which use a common data set for execution. The dataset is the output of the first actor and is the input of the actor following it.Topological sorting
The final step in the data flow reconstruction is theTracing and replay
This is the most crucial step in big data debugging. The captured lineage is combined and processed to obtain the data flow of the pipeline. The data flow helps the data scientist or a developer to look deeply into the actors and their transformations. This step allows the data scientist to figure out the part of the algorithm that is generating the unexpected output. A big data pipeline can go wrong in two broad ways. The first is a presence of a suspicious actor in the data-flow. The second being the existence of outliers in the data. The first case can be debugged by tracing the data-flow. By using lineage and data-flow information together a data scientist can figure out how the inputs are converted into outputs. During the process actors that behave unexpectedly can be caught. Either these actors can be removed from the data flow or they can be augmented by new actors to change the data-flow. The improved data-flow can be replayed to test the validity of it. Debugging faulty actors include recursively performing coarse-grain replay on actors in the data-flow, which can be expensive in resources for long dataflows. Another approach is to manually inspect lineage logs to find anomalies, which can be tedious and time-consuming across several stages of a data-flow. Furthermore, these approaches work only when the data scientist can discover bad outputs. To debug analytics without known bad outputs, the data scientist need to analyze the data-flow for suspicious behavior in general. However, often, a user may not know the expected normal behavior and cannot specify predicates. This section describes a debugging methodology for retrospectively analyzing lineage to identify faulty actors in a multi-stage data-flow. We believe that sudden changes in an actor’s behavior, such as its average selectivity, processing rate or output size, is characteristic of an anomaly. Lineage can reflect such changes in actor behavior over time and across different actor instances. Thus, mining lineage to identify such changes can be useful in debugging faulty actors in a data-flow. The second problem i.e. the existence of outliers can also be identified by running the data-flow step wise and looking at the transformed outputs. The data scientist finds a subset of outputs that are not in accordance to the rest of outputs. The inputs which are causing these bad outputs are the outliers in the data. This problem can be solved by removing the set of outliers from the data and replaying the entire data-flow. It can also be solved by modifying the machine learning algorithm by adding, removing or moving actors in the data-flow. The changes in the data-flow are successful if the replayed data-flow does not produce bad outputs.Challenges
Even though the use of data lineage approaches is a novel way of debugging of big data pipelines, the process is not simple. The challenges include scalability of the lineage store, fault tolerance of the lineage store, accurate capture of lineage for black box operators and many others. These challenges must be considered carefully and trade offs between them need to be evaluated to make a realistic design for data lineage capture.Scalability
DISC systems are primarily batch processing systems designed for high throughput. They execute several jobs per analytics, with several tasks per job. The overall number of operators executing at any time in a cluster can range from hundreds to thousands depending on the cluster size. Lineage capture for these systems must be able scale to both large volumes of data and numerous operators to avoid being a bottleneck for the DISC analytics.Fault tolerance
Lineage capture systems must also be fault tolerant to avoid rerunning data flows to capture lineage. At the same time, they must also accommodate failures in the DISC system. To do so, they must be able to identify a failed DISC task and avoid storing duplicate copies of lineage between the partial lineage generated by the failed task and duplicate lineage produced by the restarted task. A lineage system should also be able to gracefully handle multiple instances of local lineage systems going down. This can achieved by storing replicas of lineage associations in multiple machines. The replica can act like a backup in the event of the real copy being lost.Black-box operators
Lineage systems for DISC dataflows must be able to capture accurate lineage across black-box operators to enable fine-grain debugging. Current approaches to this include Prober, which seeks to find the minimal set of inputs that can produce a specified output for a black-box operator by replaying the data-flow several times to deduce the minimal set, and dynamic slicing, as used by Zhang et al. to capture lineage for NoSQL operators through binary rewriting to compute dynamic slices. Although producing highly accurate lineage, such techniques can incur significant time overheads for capture or tracing, and it may be preferable to instead trade some accuracy for better performance. Thus, there is a need for a lineage collection system for DISC dataflows that can capture lineage from arbitrary operators with reasonable accuracy, and without significant overheads in capture or tracing.Efficient tracing
Tracing is essential for debugging, during which, a user can issue multiple tracing queries. Thus, it is important that tracing has fast turnaround times. Ikeda et al. can perform efficient backward tracing queries for MapReduce dataflows, but are not generic to different DISC systems and do not perform efficient forward queries. Lipstick, a lineage system for Pig, while able to perform both backward and forward tracing, is specific to Pig and SQL operators and can only perform coarse-grain tracing for black-box operators. Thus, there is a need for a lineage system that enables efficient forward and backward tracing for generic DISC systems and dataflows with black-box operators.Sophisticated replay
Replaying only specific inputs or portions of a data-flow is crucial for efficient debugging and simulating what-if scenarios. Ikeda et al. present a methodology for lineage-based refresh, which selectively replays updated inputs to recompute affected outputs.Robert Ikeda, Semih Salihoglu, and Jennifer Widom. Provenance-based refresh in data-oriented workflows. In Proceedings of the 20th ACM international conference on Information and knowledge management, CIKM ’11, pages 1659–1668, New York, NY, USA, 2011. ACM. This is useful during debugging for re-computing outputs when a bad input has been fixed. However, sometimes a user may want to remove the bad input and replay the lineage of outputs previously affected by the error to produce error-free outputs. We call this exclusive replay. Another use of replay in debugging involves replaying bad inputs for step-wise debugging (called selective replay). Current approaches to using lineage in DISC systems do not address these. Thus, there is a need for a lineage system that can perform both exclusive and selective replays to address different debugging needs.Anomaly detection
One of the primary debugging concerns in DISC systems is identifying faulty operators. In long dataflows with several hundreds of operators or tasks, manual inspection can be tedious and prohibitive. Even if lineage is used to narrow the subset of operators to examine, the lineage of a single output can still span several operators. There is a need for an inexpensive automated debugging system, which can substantially narrow the set of potentially faulty operators, with reasonable accuracy, to minimize the amount of manual examination required.See also
*References
{{Reflist, 33em Data management Distributed computing problems Big data