Implementing MERGE INTO for Apache Iceberg with Datafusion
Jan Kaul
Implementing MERGE INTO for Apache Iceberg with Datafusion
What is MERGE INTO?
MERGE INTO is a SQL statement that conditionally combines INSERT, UPDATE, and DELETE operations into a single, atomic operation. Think of it as the analytical equivalent to UPSERT, but designed specifically for bulk operations on millions of rows rather than individual records. While transactional databases commonly perform single-row operations, analytical workloads need to process data in bulk for efficiency. The MERGE INTO statement executes all modifications atomically based on a match condition, making it essential for data pipelines that need to efficiently incorporate changes into existing datasets.
In this post, we’ll explore how we implemented MERGE INTO in Embucket for Iceberg tables using Apache Datafusion. We’ll walk through the technical challenges of implementing updates on immutable parquet files and show how we optimized the implementation to minimize write amplification.
A Simple Example
To understand how MERGE INTO works in practice, let’s walk through a concrete example. Imagine we have a target table with 3 existing rows (ids 1-3), and we want to apply changes from a source table. The source table contains two rows: one with id=2 that already exists in the target (requiring an UPDATE), and one with id=4 that’s new (requiring an INSERT).
Target table
| id | name |
|---|---|
| 1 | Alice |
| 2 | Bob |
| 3 | Charlie |
Source table
| id | name |
|---|---|
| 2 | Robert |
| 4 | Eddy |
The MERGE INTO statement handles both cases in a single operation. The ON clause defines the match condition (target.id = source.id), while the WHEN MATCHED clause applies UPDATE logic for matching rows, and the WHEN NOT MATCHED clause applies INSERT logic for new rows.
MERGE INTO target USING source ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.name = source.name
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name);
After executing this statement, the result shows that id=2 has been updated to “Robert”, id=4 has been inserted as a new row, and ids 1 and 3 remain unchanged:
Result
| id | name |
|---|---|
| 1 | Alice |
| 2 | Robert |
| 3 | Charlie |
| 4 | Eddy |
Why MERGE INTO Matters
MERGE INTO has become fundamental to modern data engineering workflows for several compelling reasons. First, it provides significant performance benefits compared to executing separate INSERT, UPDATE, and DELETE statements. By combining these operations into a single pass over the data, we avoid multiple scans and reduce I/O overhead. Additionally, MERGE INTO ensures atomicity—all changes either succeed or fail together, maintaining consistency even when processing millions of rows.
The operation is particularly valuable for several common use cases. Incremental dbt pipelines use MERGE INTO to efficiently update tables with only changed data, avoiding full table rewrites. When tracking historical changes through slowly changing dimensions (SCD Type 1 and Type 2), MERGE INTO provides the flexibility to handle both in-place updates and historical versioning. CDC (Change Data Capture) integration relies heavily on MERGE INTO to apply database change streams to data lakes, and it’s also essential for handling deduplication of late-arriving data in streaming scenarios.
To illustrate how MERGE INTO appears in production systems, here’s a real-world example from dbt incremental pipelines:
merge into <existing_table> DBT_INTERNAL_DEST
from <temp_table_with_new_records> DBT_INTERNAL_SOURCE
on
-- unique key
DBT_INTERNAL_DEST.id = DBT_INTERNAL_SOURCE.id
and
-- custom predicate: limits data scan in the "old" data / existing table
DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)
when matched then update ...
when not matched then insert ...
Notice how dbt uses additional predicates to limit the data scan in the existing table, further optimizing performance by reducing the amount of data that needs to be processed.
The Technical Challenge
Implementing MERGE INTO on Iceberg requires understanding how Iceberg handles data at a fundamental level. The core challenge we face is deceptively simple: how do we implement updates on immutable files?
Iceberg and Immutable Files
Iceberg is an open table format that adds a powerful metadata layer on top of parquet data files. It provides ACID transactions, time travel, and schema evolution capabilities for data lakes—features traditionally associated only with databases. At its foundation, Iceberg uses parquet as its storage format, a columnar format specifically optimized for analytical read performance.
However, there’s a key constraint we must work with: parquet files are immutable. Once written, the records in a parquet file cannot be updated in-place. This creates a fundamental tension in our design. Analytical formats like parquet prioritize read performance through columnar layout, compression, and immutability. These properties enable the blazingly fast scans that analytical workloads demand. Yet MERGE INTO, by its very nature, requires modifications—updates and deletes—which parquet simply doesn’t support. Traditional databases solve this problem with in-place updates, but that approach isn’t available to us.
Iceberg’s elegant solution to this challenge is its metadata layer, which tracks file versions and snapshots. A snapshot represents a point-in-time view of the table, referencing a specific set of data files. To “update” data in Iceberg, we don’t actually modify existing files. Instead, we create new files with the updated content and then update the snapshot to reference these new files while removing references to the old ones. The old files remain in storage for time-travel queries but are no longer part of the current table view.
Copy-On-Write Strategy
When it comes to handling updates in Iceberg, we have two strategic options: Copy-On-Write (COW) and Merge-On-Read (MOR). Each approach has distinct trade-offs that affect both implementation complexity and runtime performance.
With Copy-On-Write—the strategy we chose for our implementation—we rewrite an entire data file whenever any row in that file changes. This approach has clear advantages: the implementation is straightforward, and reads are fast since there’s no need to look up and merge delete files. However, it comes with a significant downside: write amplification. When you update a small portion of a large file containing millions of rows, you still need to rewrite the entire file, which can be wasteful.
The alternative, Merge-On-Read, writes separate delete files to mark removed records rather than rewriting data files. This makes writes more efficient—you only write the changed data. But it comes at the cost of read complexity: queries must now merge data files with delete files to get the correct results, adding overhead to the read path.
We chose Copy-On-Write for our implementation because it provides a simpler read path and more predictable query performance. However, this choice makes minimizing write amplification the key challenge in our implementation—we must identify and rewrite only the files that are actually affected by the merge operation.
To visualize how Copy-On-Write works, consider this simple update operation:
update target
set name = 'Robert'
where id = 2
Original file:
| id | name |
|---|---|
| 1 | Alice |
| 2 | Bob |
| 3 | Charlie |
New rewritten file:
| id | name |
|---|---|
| 1 | Alice |
| 2 | Robert |
| 3 | Charlie |
Even though only a single row changed (id=2), the entire file must be rewritten with all three rows. This is the write amplification we need to minimize.
Our Implementation
Our goal was to implement MERGE INTO for Iceberg tables using the Copy-On-Write strategy we just discussed. The core challenge is identifying which parquet files need rewriting without scanning the entire table—a crucial optimization when dealing with tables containing thousands of parquet files and billions of rows.
Our solution is to track file provenance through the query execution pipeline using auxiliary metadata columns. We chose Apache Datafusion as our query engine because of its extensibility and excellent integration with the Arrow ecosystem, which makes it straightforward to inject custom metadata and build specialized operators.
Our implementation consists of five main steps: first, we enrich scanned data with file tracking metadata; second, we join the target and source tables using a full outer join; third, we apply the matching expressions to determine UPDATE/INSERT/unchanged logic for each row; fourth, we filter to keep only affected files; and finally, we write new parquet files and update the Iceberg snapshot. Let’s walk through each of these steps with a concrete example to see how they work together.
Example Setup
We’ll use the same merge scenario from our earlier example, but now we’ll examine the physical file layout to understand how our implementation tracks and optimizes file rewrites. The target table contains 3 rows distributed across 2 parquet files: 1.parquet (containing just Alice) and 2.parquet (containing Bob and Charlie). The source table has 2 rows: one matching record (id=2 for Robert) and one new record (id=4 for Eddy). Our merge condition is straightforward: target.id = source.id.
The crucial insight here is that we need to track which parquet files contain rows affected by the merge. This tracking allows us to avoid rewriting files that aren’t impacted, significantly reducing write amplification. Here’s the initial state:
Target: 1.parquet
| id | name |
|---|---|
| 1 | Alice |
2.parquet
| id | name |
|---|---|
| 2 | Bob |
| 3 | Charlie |
Source:
| id | name |
|---|---|
| 2 | Robert |
| 4 | Eddy |
Step 1: Enrich Scanned Data
The first step in our implementation is to inject auxiliary metadata columns during the table scan phase. We add three special columns to track critical information throughout the query execution. The __data_file_path column records which parquet file each target row came from—this is the key to our optimization strategy. We also add boolean marker columns: __target to identify rows from the target table and __source to identify rows from the source table.
These columns propagate through the entire query execution pipeline, flowing through joins, filters, and projections. By preserving this metadata all the way through to the final stages, we can identify exactly which files are affected by the merge operation.
After the scan phase, our data looks like this:
| __data_file_path | id | name | __target |
|---|---|---|---|
| 1.parquet | 1 | Alice | true |
| 2.parquet | 2 | Bob | true |
| 2.parquet | 3 | Charlie | true |
| id | name | __source |
|---|---|---|
| 2 | Robert | true |
| 4 | Eddy | true |
Step 2: Join Target & Source
Next, we perform a full outer join on the merge condition (target.id = source.id). It’s crucial that we use a full outer join rather than an inner join because we need to preserve all rows from both sides: matched rows, target-only rows, and source-only rows. This is essential for Copy-On-Write semantics—unmatched target rows must be kept and rewritten in any affected files to maintain data completeness.
The join produces three distinct types of rows. Matched rows have both __target and __source set to true, representing the UPDATE case where a row exists in both tables. Target-only rows have __target set to true but null source columns, representing unchanged rows that need to be preserved. Source-only rows have __source set to true but no __data_file_path, representing the INSERT case for new records.
The result of our join looks like this:
| __data_file_path | id | t.name | __target | s.name | __source |
|---|---|---|---|---|---|
| 1.parquet | 1 | Alice | true | ||
| 2.parquet | 2 | Bob | true | Robert | true |
| 2.parquet | 3 | Charlie | true | ||
| 4 | Eddy | true |
Step 3: Apply Matching Expressions
Now we apply the MERGE INTO logic using a CASE expression that handles each row type appropriately. For rows where both __source and __target are true (our WHEN MATCHED case), we apply the UPDATE expression to merge the source data into the target schema. For rows where only __source is true (WHEN NOT MATCHED BY TARGET), we apply the INSERT expression to add the new row. For rows with only __target (WHEN NOT MATCHED BY SOURCE), we keep the row unchanged—or apply a DELETE if the MERGE statement specified that behavior.
The result is a unified output stream containing updated, inserted, and unchanged rows. Crucially, we preserve the __data_file_path and __source marker columns at this stage—we’ll need them in the next step for filtering.
CASE
WHEN __source = true AND __target = true THEN <match_expression>
WHEN __source = true AND ISNULL(__target) THEN <not_match_expression>
ELSE target
END
After applying these expressions, our data looks like this:
| __data_file_path | id | name | __source |
|---|---|---|---|
| 1.parquet | 1 | Alice | |
| 2.parquet | 2 | Robert | true |
| 2.parquet | 3 | Charlie | |
| 4 | Eddy | true |
Notice that row 2 has been updated to “Robert” and now has __source set to true, indicating it was affected by the merge. Row 4 is a new insert, also marked with __source true.
Step 4: Filter Out Unneeded Files
This is where our file tracking strategy delivers its key optimization. We identify affected files by looking for any file that contains at least one row where __source is true. Files without any source matches are completely unaffected by the merge and don’t need rewriting at all.
Our filter logic keeps only rows from files that have __source = true somewhere in them, plus all INSERT rows (which don’t have a file path yet). This dramatically reduces write amplification by avoiding unnecessary file rewrites. In our example, 1.parquet contains only Alice’s row, which has no __source marker, so the entire file is filtered out and will be reused as-is in the new snapshot. Meanwhile, 2.parquet contains rows with __source = true (both the updated Robert row and the unchanged Charlie row from the same file), so we keep these rows for rewriting.
Unaffected files remain referenced in the new Iceberg snapshot without any I/O—we simply reuse the existing file reference in the metadata. This is the core optimization that makes our Copy-On-Write implementation efficient.
| __data_file_path | id | name | __source |
|---|---|---|---|
| 2.parquet | 2 | Robert | true |
| 2.parquet | 3 | Charlie | |
| 4 | Eddy | true |
Step 5: Remove Auxiliary Columns
In the final step, we clean up by dropping the auxiliary columns (__data_file_path, __source, __target) and projecting only the original table schema columns. We then write these cleaned rows to new parquet files.
Simultaneously, we update the Iceberg snapshot metadata to reflect the changes. We remove the old affected files (2.parquet in our example) from the current snapshot, add the newly written files (let’s call it 3.parquet, containing the merged results), and keep the unaffected files (1.parquet) exactly as they were. It’s important to note that the old files remain in storage to support time-travel queries—they’re just no longer part of the current snapshot.
The final table state consists of 1.parquet (the original file, completely unchanged) plus 3.parquet (the new file containing the updates and inserts). This gives us the complete merged result while minimizing write amplification.
New file (3.parquet):
| id | name |
|---|---|
| 2 | Robert |
| 3 | Charlie |
| 4 | Eddy |
Conclusion
MERGE INTO is essential for modern analytical data pipelines, enabling efficient incremental updates in dbt workflows, CDC integration, and slowly changing dimension management. The core challenge we addressed was implementing updates on immutable parquet files in Iceberg tables—a problem that requires careful architectural consideration.
Our solution tracks file provenance using auxiliary columns throughout the query execution pipeline. This seemingly simple approach enables a critical optimization: we only rewrite affected files rather than the entire table. By identifying exactly which parquet files contain rows impacted by the merge, we dramatically reduce write amplification while maintaining the simplicity and predictable performance of the Copy-On-Write strategy.
Datafusion’s extensibility proved invaluable for this implementation, making it straightforward to inject metadata columns and build the custom operators we needed. The approach successfully balances write efficiency with read performance, keeping queries fast while avoiding unnecessary file rewrites.
That said, it’s worth acknowledging the key trade-off: write amplification remains a concern for small updates to large files, though our selective rewriting approach minimizes it as much as possible under Copy-On-Write semantics. Future work could explore hybrid approaches that combine aspects of both Copy-On-Write and Merge-On-Read, or partition-level optimizations tailored to specific workload patterns. Nonetheless, this implementation provides a solid foundation for MERGE INTO operations on Iceberg tables that performs well across a wide range of real-world data engineering scenarios.