building robust data pipelines with · • an integer column in json started getting inferred as...
TRANSCRIPT
![Page 1: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/1.jpg)
Building Robust Data Pipelines with
May 2019
![Page 2: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/2.jpg)
WhoamI
● SoftwareEngineer– Databricks- “Wemakeyourstreamscometrue”
● ApacheSparkCommitter● MSinManagementScience&Engineering-StanfordUniversity● BSinMechanicalEngineering- BogaziciUniversity,Istanbul
![Page 3: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/3.jpg)
Today, we’re going to ride a time machine
![Page 4: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/4.jpg)
Let’s go back to 2014…
![Page 5: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/5.jpg)
Evolution of Data Pipelines @ DatabricksCirca MMXIV
Deployment
Deployment
Deployment
Deployment
S3-copy Batch Job Every Hour*
Amazon S3
![Page 6: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/6.jpg)
Data Pipeline V1
• Took 1 engineer ~1 week to implement• Was pretty robust for the early days of Databricks
• … until we got to 30+ customers• File listing on S3 quickly became the bottleneck• *eventually, job duration rose to 8 hours
![Page 7: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/7.jpg)
Fast-forward to 2016…
![Page 8: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/8.jpg)
Amazon Kinesis
Evolution of Data Pipelines @ DatabricksCirca 2016
Deployment
Deployment
Deployment
Deployment
Stream Continuously
Compact Once a day
![Page 9: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/9.jpg)
Data Pipeline V2
• Scaled very well• ETL’ing the data became fast• Took 2 engineers ~8 months• Query performance / experience got worse
• Lots of small files• Compaction jobs impacted queries (FileNotFoundExceptions)• HiveMetaStore quickly became bottleneck• REFRESH TABLE / MSCK REPAIR TABLE / ALTER TABLE ADD PARTITIONS
• Logic became more complicated. Pipeline was less robust• Fixing mistakes in the data became harder
![Page 10: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/10.jpg)
What Happened?
![Page 11: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/11.jpg)
Examples of Data Mistakes
• A field’s unit changed from MB to GB• Schema inference / mismatch
• An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs
• A different type of log got introduced to the system. All of a sudden a table with 8 columns had 32 new columns introduced
• A Json column was written with two different cases: “Event” instead of “event”. Parquet failed to read the written values for “Event” anymore
• Garbage data caused by partial failures
![Page 12: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/12.jpg)
Problems in Data Pipelines• Correctness
• Lack of atomicity leads to pockets of duplicate data• Bookkeeping of what to process is tedious – late / out-of-order data• Schema Management• Maintaining Data Hygiene – checks/corrections
• Performance• Listing files on blob storage systems is slow• Lots of small files hurt query performance• HiveMetaStore experience is horrendous
– Doesn’t scale well– Having to call MSCK REPAIR TABLE and REFRESH TABLE all the time
![Page 13: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/13.jpg)
Enter Delta Lake• Designed for Data Lake Architectures• No lock-in – Data in open Parquet format• No dependency on HiveMetaStore – Manages metadata internally• Scales to Billions of partitions and/or files• Supports Batch/Stream Reads/Writes• ACID Transactions• Schema Enforcement• Tables Auto-Update and provide Snapshot Isolation• Query old versions of your table: Time Travel
![Page 14: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/14.jpg)
Data Pipelines @ Databricks
Event Based
Reporting
StreamingAnalytics
Bronze Tables Silver Tables Gold Tables
![Page 15: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/15.jpg)
Properties of Bronze/Silver/Gold• Bronze tables
• No data processing• Deduplication + JSON => Parquet conversion• Data kept around for a couple weeks in order to fix mistakes just in case
• Silver tables• Directly queryable tables• PII masking/redaction
• Gold tables• Materialized/Summary views of silver tables• Curated tables by the Data Science team
![Page 16: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/16.jpg)
Correctness Problems in Data Pipelines• Lack of atomicity leads to pockets of duplicate data
• Delta provides ACID transactions and Snapshot Isolation• Bookkeeping of what to process is tedious – late / out-of-order data
• Structured Streaming handles this. Streaming into Delta provides exactly-once semantics
• Schema Management• Delta manages the schema of the table internally and allows “safe” (opt-
in) evolutions• Maintaining Data Hygiene – checks/corrections
• Delta supports DELETE / UPDATE to delete/fix records (coming soon to OSS)
• Delta supports Invariants (NOT NULL, enum in (‘A’, ‘B’, ‘C’))
![Page 17: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/17.jpg)
Performance Problems in Data Pipelines• Listing files on blob storage systems is slow
• Delta doesn’t need to list files. It keeps file information in its state• Lots of small files hurt query performance
• Delta’s OPTIMIZE method compacts data without affecting in-flight queries
• HiveMetaStore experience is horrendous• Delta uses Spark jobs to compute its state, therefore metadata is
scalable!• Delta auto-updates tables, therefore you don’t need REFRESH TABLE /
MSCK REPAIR TABLE / ALTER TABLE ADD PARTITIONS, etc
![Page 18: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/18.jpg)
CREATE TABLE ...
USING delta…
dataframe.write.format("delta").save("/data")
Get Started with Delta using Spark APIs
CREATE TABLE ...
USING parquet...
dataframe.write.format("parquet").save("/data")
Instead of parquet... … simply say delta
![Page 19: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/19.jpg)
TutorialNotebook at:
https://docs.delta.io/notebooks/sais19-tutorial.ipynb
https://docs.delta.io/notebooks/sais19-tutorial.dbc
![Page 20: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/20.jpg)
Roadmap for Delta Lake
• Support for AWS S3 and Microsoft Azure File Systems• Scala APIs for DELETE / UPDATE / MERGE• Support for VACUUM and CONVERT TO DELTA• Expectations / Invariants
![Page 21: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/21.jpg)
Summary• File listing and many small files hurt performance
• Delta Lake avoids file listing
• HiveMetaStore can become the bottleneck for large tables• Partial / distributed failures can taint tables• Schema Management and Data Hygiene are hard problems• Bringing traditional Data Warehouses feel to Data Lakes
![Page 22: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/22.jpg)
Summary• File listing and many small files hurt performance• HiveMetaStore can become the bottleneck for large tables
• Delta uses Spark jobs to manage its metadata to scale to billions of files
• Delta auto-updates => No need to call REFRESH TABLE with Spark• No need to add/remove partitions, no need for MSCK REPAIR TABLE
• Partial / distributed failures can taint tables• Schema Management and Data Hygiene are hard problems• Bringing traditional Data Warehouses feel to Data Lakes
![Page 23: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/23.jpg)
Summary• File listing and many small files hurt performance• HiveMetaStore can become the bottleneck for large tables• Partial / distributed failures can taint tables
• Delta’s ACID transactions guard us against garbage data• Always get a consistent view of your table with Delta
• Schema Management and Data Hygiene are hard problems• Bringing traditional Data Warehouses feel to Data Lakes
![Page 24: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/24.jpg)
Summary• File listing and many small files hurt performance• HiveMetaStore can become the bottleneck for large tables• Partial / distributed failures can taint tables• Schema Management and Data Hygiene are hard problems
• Delta has in-built schema management to only allow safe changes• Delta architecture (Bronze-Silver-Gold tables) combined with Delta
makes backfills and corrections easier• Delta’s support for time travel makes corrections effortless
• Bringing traditional Data Warehouses feel to Data Lakes
![Page 25: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/25.jpg)
Summary• File listing and many small files hurt performance• HiveMetaStore can become the bottleneck for large tables• Partial / distributed failures can taint tables• Schema Management and Data Hygiene are hard problems • Bringing traditional Data Warehouses feel to Data Lakes
• Fine-grained data modification capabilities through UPDATE / DELETE / MERGE (coming)
![Page 26: Building Robust Data Pipelines with · • An integer column in JSON started getting inferred as longs after Spark upgrade. Some Parquet files had ints, some had longs • A different](https://reader034.vdocuments.us/reader034/viewer/2022042221/5ec7e6f09b761d7a4112ac31/html5/thumbnails/26.jpg)
Thank You“Do you have any questions for my prepared answers?”
– Henry Kissinger