How to Build Spark Lineage for Data Lakes
Better understand the intricacies of Spark and how to build a lineage graph for data lakes.
Join the DZone community and get the full member experience.
Join For FreeWhen a data pipeline breaks, data engineers need to immediately understand where the rupture occurred and what has been impacted. Data downtime is costly.
Without data lineage—a map of how assets are connected and data moves across its lifecycle—data engineers might as well conduct their incident triage and root cause analysis blindfolded.
How We Automate SQL Lineage
Developing data lineage for SQL is a much different process than developing Spark lineage.
To retrieve data using SQL, a user would write and execute a query, which is then typically stored in a log. These SQL queries contain all the breadcrumbs necessary to trace which columns or fields from specific tables are feeding other tables downstream.
For example, we can look at this SQL query which will display the outcomes of at-bats for a baseball team’s players…
SELECT player.first_name, player.last_name, bat.date, bat_outcome.outcome_text
FROM player
INNER JOIN bat ON bat.player_id = player.id
INNER JOIN bat_outcome ON bat.bat_outcome_id = bat_outcome.id
… and we can understand the connections between the above player, bat, and bat_outcome tables.
You can see the downstream field-to-field relationships in the resulting “at-bat outcome” table from the SELECT statements and the table-to-field dependencies in the non-SELECT statements.
Metadata from the data warehouse/lake and from the BI tool of record can then be used to map the dependencies between the tables and dashboards.
Parsing all this manually to develop end-to-end lineage is possible, but it’s tedious.
It also becomes outdated virtually the moment it’s mapped as your environment continues to ingest more data and you continue to layer on additional solutions.
The way we automate it is by using a homegrown data collector to grab our customers’ SQL logs from their data warehouse or lake, stream the data to different components of our data pipelines. We leverage the open source ANTLR parser, which we heavily customized for various dialects of SQL, in a Java-based lambda function to comb through the query logs and generate lineage data.
The back-end architecture of our field-level SQL lineage solution looks something like this:
Easy? No. Easy compared to Spark lineage? Absolutely.
How We Solved End-to-End Spark Lineage
Apache Spark doesn’t quite work the same way. Spark supports several different programming interfaces that can create jobs such as Scala, Python, or R.
Regardless of the programming interface that’s used, it gets interpreted and compiled into Spark commands. Behind the scenes, there is no such thing as a concise query, or a log of those queries.
Following are examples from Databricks notebooks in Python, Scala, and R that all do the same thing: load a CSV file into a Spark DataFrame.
Python
%python |
Scala
%scala |
R
%r |
After Spark interprets the programmatic code and compiles the commands, it creates an execution graph (a DAG or Directed Acyclic Graph) of all the sequential steps to read data from the source(s), perform a series of transformations, and write it to an output location.
That makes the DAG the equivalent of a SQL execution plan. Integrating with it is the holy grail of Spark lineage because it contains all the information needed for how data moves through the data lake and how everything is connected.
Spark has an internal framework called QueryExecutionListeners which you can configure in Spark to listen for events where a command gets executed and then pass that command to the listener.
For example, below is the source code for the listener implementation used by an open-sourced listening agent.
package za.co.absa.spline.harvester.listener import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.execution.QueryExecutionimport org.apache.spark.sql.util.QueryExecutionListenerimport za.co.absa.spline.harvester.SparkLineageInitializer class SplineQueryExecutionListener extends QueryExecutionListener { private val maybeListener: Option[QueryExecutionListener] = { val sparkSession = SparkSession.getActiveSession .orElse(SparkSession.getDefaultSession) .getOrElse(throw new IllegalStateException(“Session is unexpectedly missing. Spline cannot be initialized.”)) new SparkLineageInitializer(sparkSession).createListener(isCodelessInit = true) } override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { maybeListener.foreach(_.onSuccess(funcName, qe, durationNs)) } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { maybeListener.foreach(_.onFailure(funcName, qe, exception)) } } |
Instead of building a series of listeners from scratch, we decided to take advantage of that open source technology, Spline.
Smart developers had already invested several years into building a Spark agent to listen for these events, capture the lineage metadata, and transform it into graph format which we could then receive via a REST API (other options for receiving this data are available as well).
Once we have that representation of the execution plan, we send it to the Integration Gateway and then a normalizer which converts it into Monte Carlo’s internal representation of a lineage event.
From there, it’s integrated with other sources of lineage and metadata to provide a single end-to-end view for each customer.
It’s a really elegant solution….here’s why it doesn’t work.
The Challenges of Spark Lineage
What makes Spark difficult from a lineage perspective, is what makes it great as a framework for processing large amounts of unstructured data. Namely, how extensible it is.
You can run Spark jobs across solutions like AWS Glue, EMR, and Databricks. In fact, there are multiple ways you can run Spark jobs in Databricks alone.
Configuring our Spark lineage solution—specifically how you add JAR files to Spark’s runtime classpath—would vary depending on how and where our customers ran their Spark jobs across these solutions and what combinations of Scala and Spark versions those solutions leveraged.
At Monte Carlo, we strongly emphasize ease-of-use and time-to-value. When you find yourself inserting a table like this into the draft documentation, it may be a sign to re-evaluate the solution.
The second challenge is that, like SQL statements, the vocabulary of Spark commands is ever expanding. But, since it is a newer framework than SQL, it’s growing at a slightly faster rate.
Every time a new command is introduced, code has to be written to extract the lineage metadata from that command. As a result, there were gaps in Spline’s parsing capabilities with commands that weren’t yet supported.
Unfortunately, many of these gaps needed to be filled for our customers’ use cases. For example, a large biotech company needed coverage for when it utilized the Spark MERGE command which, just like the SQL statement and tables, combines two dataframes together by inserting what’s new from the incoming dataframe and updates any existing records it finds.
For example, taking our simplistic baseball tables from before, this is how the new Spark MERGE command could be used to add new at-bats, update previously existing at-bats with corrected data, or maybe even delete at-bats that are so old we don’t care about them any more.
MERGE into bat using bat_stage on bat.player_id = bat_stage.player_id and bat.opponent_id = bat_stage.opponent_id and bat.date = bat_stage.date and bat.at_bat_number = bat_stage.at_bat_number when matched then update set bat.bat_outcome_id = bat_stage.bat_outcome_id when not matched then insert ( player_id, opponent_id, date, at_bat_number, bat_outcome_id ) values ( bat_stage.player_id, bat_stage.opponent_id, bat_stage.date, bat_stage.at_bat_number, bat_stage.bat_outcome_id ) |
It’s a relatively new command and Spline doesn’t support it. Additionally, Databricks has developed their own implementation of the MERGE statement into which there is no public visibility.
These are big challenges sure, but they also have solutions.
We could ensure there is more client hand holding for Spark lineage configuration. We could hire and deploy an army of Scala ninjas to contribute support for new commands back to the Spline agent. We could even get cheeky and reverse engineer how to derive lineage from Databricks’ MERGE command implementation.
A good engineer can build solutions for hard problems. A great engineer takes a step back and asks, “Is the juice worth the squeeze? Is there a better way this can be done?”
Oftentimes buying or integrating with off-the-shelf solutions is not only more time efficient, but it prevents your team from accruing technical debt. So we went in another direction.
Partnering with Databricks
During the beta-testing of our Spark lineage solution, we found the primary use case for virtually every customer was for lineage within Databricks.
We found a better solution was to integrate with Databricks data lineage via the Unity Catalog. Some of the features in Databrick’s lineage solution include:
- Automated run-time lineage: Unity Catalog automatically captures lineage generated by operations executed in Databricks. This helps data teams save significant time compared to manually tagging the data to create a lineage graph.
- Support for all workloads: Lineage is not limited to just SQL. It works across all workloads in any language supported by Databricks: Python, SQL, R, and Scala. This empowers all personas—data analysts, data scientists, ML experts—to augment their tools with data intelligence and context surrounding the data, resulting in better insights.
- Lineage at column level granularity: The Unity Catalog captures data lineage for tables, views, and columns. This information is displayed in real-time, enabling data teams to have a granular view of how data flows both upstream and downstream from a particular table or column in the lakehouse with just a few clicks.
- Lineage for notebooks, workflows, and dashboards: Unity Catalog can also capture lineage associated with non-data entities, such as notebooks, workflows, and dashboards. This helps with end-to-end visibility into how data is used in your organization. As a result, you can answer key questions like, “if I deprecate this column, who is impacted?”
- Built-in security: Lineage graphs in Unity Catalog are privilege-aware and share the same permission model as Unity Catalog. If users do not have access to a table, they will not be able to explore the lineage associated with the table, adding an additional layer of security for privacy considerations.
- Easily exportable via REST API: Lineage can be visualized in the Data Explorer in near real-time, and retrieved via REST API to support integrations with our catalog partners.
I hope our experience has helped shed some light both on the intricacies of automated and Spark lineage.
Published at DZone with permission of Lior Gavish. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments