DeltaLake (delta.io) cost optimisation on AWS S3
Introduction
Our goal is to build data platform for entire organisation for OLAP needs.
The record level upserts supported on Apache Hive with Hive ACID and the same on Parquet files is built with DeltaLake/ Apache Hudi. DeltaLake works with both Spark batch/ streaming APIs.
As our organisation is using AWS public cloud, we decided to go with Databricks on AWS, which supports DeltaLake with its platform. With Databricks, on demand Spark clusters can spin up on cloud infrastructure and it supports for batch, realtime, ad-hoc querying and visualisation of data requirements.
Solution
As existing the OLTP applications are built on top of MongoDB and PostgreSQL. To offload the OLAP queries/ applications from the OLTP systems and load the Data Platform in near realtime, we are streaming the Change Data Capture (CDC) from both MongoDB and PostgreSQL using debezium.
We used Spark’s structured streaming to capture the events from OLTP systems and loading to AWS S3. Here we have addressed few issues which are related AWS cost. In general DeltaLake, writes data in Apache Parquet format and maintains the multiple versions of files for data upserts.
In general, AWS S3 costs for every GetObjects/ ListObjects API call on S3 buckets. The DeltaLake APIs like below, internally calls the AWS S3 API’s.
1. DeltaTable.forPath
2. DeltaTable.merge
3. DeltaTable.vacuum
As we are loading the PostgreSQL tables/ MongoDB collections data, we wrote Processor to specific table/ collection. Each Processor takes care of parsing the JSON message from Debezium (Which is the single CRUD operation on PostgreSQL/ MongoDB)
Optimisation
Below are the optimisations to save the AWS cost.
- DeltaTable API initialisation does scan on <DeltaTable_Path>/_delta_log/ directory. So invoke DeltaTable.forPath API once at the time Spark job initialisation, as it is internally calls AWS S3 ListObjects.
- Initially we were calling DeltaTable.vacuum API at the end of every MicroBatch execution to purge the old versions of delta table data. Which internally calls AWS S3 GetObjects/ ListObjects and DeleteObjects API. Here we optimised by postponing DeltaTable.vacuum API call to daily once.
- We run OPTIMIZE API at configured time interval like VACUUM above, which merges small files into 128MB equivalent size files.
- DeltaTable.merge API (core API ) does actual insert/ update/ delete operation of particular record(s).
From Debezium, we get complete record incase of insert/ update operation from PostgreSQL and partial record for certain update cases from MongoDB.
Even though we have created partitioned tables on Databricks, for some of the updates (like $set) on MongoDB, we were not receiving partition column value all the time, which led to full scan on AWS S3 for DeltaTable.merge API to find the old record to update. This has incurred huge impact on processing time as well as cost.
we need to provide table partition column values for update operation to avoid full scans for DeltaTable.merge API.
In our case, for partial MongoDB records we do lookup on source (MongoDB) with ObjectId to fetch the complete collection record attributes and loads the latest record on AWS S3.
With above optimisations, we saved ~55% of AWS cost per day.