Postgres to Iceberg in 13 minutes: How Supermetal compares to Flink, Kafka Connect, and Spark

Supermetal has recently introduced Iceberg sink support, prompting a fresh exploration of its capabilities. Building upon my earlier examination of the newly announced Kafka sink, this post serves as a continuation of that analysis.

The objective was to juxtapose Supermetal against established open-source tools for writing data to Iceberg, specifically Kafka Connect, Flink, and Spark. The focus was on measuring a realistic end-to-end pipeline, transferring data from Postgres to Iceberg.

Given that these workloads are typically employed for data archival, the emphasis was not on latency comparison, which tends to be more lenient. Instead, the primary focus was on throughput during the snapshotting phase.

To ensure a normalized comparison, scale-out scenarios were excluded. The aim was to ascertain the throughput achievable by a single node, maintaining consistent resource allocation across tests.

Supermetal completed snapshotting in 13 minutes, while Flink took between 90 to 116 minutes, Kafka Connect required 120 minutes, and Spark exceeded 3 hours.

As highlighted in the findings, CDC performance emerged as the most significant bottleneck, particularly for Flink and Kafka Connect/Debezium. Most tools maintain a clear separation between sources and sinks, which is a commendable architectural principle. However, Supermetal adopts a distinctive approach: its Iceberg writer can toggle between configuration options based on the CDC source phase (snapshotting vs. live). This method resembles the sideways information-passing technique found in certain databases, rather than a strict coupling.

Disclosure: This work was sponsored by Supermetal. All benchmarks were executed by me in my AWS account. All numbers and findings are shared as is.

Test setup

I utilized the TPC-H dataset with a scale factor (SF) of 50, which comprises eight tables of varying sizes. With SF=50, the largest table (lineitem) contains 300 million rows, while the second-largest (orders) holds 75 million rows.

The infrastructure setup included:

  • AWS RDS Aurora Postgres 16 Serverless, with a maximum of 48 ACUs.
  • AWS MSK 3.9 featuring three express.m7g.xlarge brokers.
  • AWS EKS 1.34 utilizing m8i.xlarge nodes (4 CPU cores, 16 GB RAM).
    • All workloads were primarily executed on a single node (configured to request 3 CPU cores and 13 GB of RAM). Flink’s TaskManager employed four task slots, while the Debezium Iceberg sink connector utilized four tasks. Spark operated with a single executor, leveraging all available resources.
  • Iceberg tables were powered by AWS Glue and AWS S3.

The versions used in the tests were:

  • Latest Supermetal build (provided as a Docker image by the Supermetal team).
  • Flink CDC 3.5.0 with Flink 1.20, deployed via the Flink Kubernetes Operator 1.13.
  • Debezium 3.4.3.Final with Kafka Connect 4.1.1, deployed using the Strimzi Operator 0.51.0.
  • Spark 4.1.1, deployed via the Spark Kubernetes Operator 0.8.0.
  • Flink, Kafka Connect, and Spark all utilized Iceberg 1.10.1 connectors.

Supermetal supports Postgres CDC sources and Iceberg sinks.

In contrast to Kafka Connect, Supermetal operates independently of Kafka or any external orchestrator, enabling direct data delivery from source to sink, with optional object storage buffering.

The deployment of Supermetal was accomplished using a JSON configuration file, which is well-suited for containerized workloads. This configuration succinctly describes the sources and sinks. Below is an example for a complete Postgres to Iceberg pipeline:

{
    "connectors": [
        {
            "id": "pg_to_iceberg",
            "source": {
                "postgres": {
                    "connection": {
                        "host": "$POSTGRES_HOST",
                        // rest of the connection details
                    },
                    "replication_type": {
                        "logical_replication": {}
                    },
                    "catalog": {
                        "name": "default",
                        "schemas": [
                            {
                                "name": "public",
                                "tables": [
                                    {"name": "lineitem"},
                                    // rest of the table names
                                ]
                            }
                        ]
                    }
                }
            },
            "sink": {
                "iceberg": {
                    "catalog": {
                        "glue": {
                            "warehouse": "s3://$ICEBERG_S3_BUCKET/",
                            "region": "$AWS_REGION"
                        }
                    },
                    "target_namespace": ["$ICEBERG_GLUE_NAMESPACE"],
                    "write_mode": {
                        "merge_on_read": {}
                    }
                }
            },
            "disabled": false
        }
    ]
}

This configuration encompasses the entire pipeline, requiring no additional setup. Table names can be discovered at runtime, eliminating the need for explicit specification.

The Iceberg sink accommodates various catalogs (including REST, AWS Glue, and AWS S3 Tables) and supports Iceberg V1, V2, or V3. Advanced configuration options include Append vs. Merge on Read, target file size for Parquet, and compression settings.

Notably, Supermetal automatically adjusts file flushing configurations based on the current phase:

  • During the snapshot phase, it adheres to the configured target file size (defaulting to 512 MB), ensuring that snapshot-phase files are sufficiently large and do not require compaction.
  • In the live CDC phase, the “flush_interval_ms” configuration (defaulting to 10 seconds) is activated, allowing for control over end-to-end latency.

This feature is quite distinctive, as other tools typically do not differentiate between lifecycle phases (snapshot vs. live CDC) at the Iceberg sink level; CDC and other connectors remain entirely decoupled.

Results

Supermetal achieved data synchronization in just 13 minutes. The BytesUploaded metric for the underlying S3 bucket is as follows:

Throughout the test, CPU and memory usage remained relatively low:

Supermetal maintained memory usage below 5% of the allocated resources, eliminating the need for any tuning. The snapshot phase benefited from both inter- and intra-table parallelization.

It is worth noting that Supermetal employs an append-only mode during snapshotting, refraining from tracking table-level keys or performing deduplication at that stage.

The file sizes for the largest table (lineitem) were optimal, aligning with the specified Parquet target size:

Apache Flink supports the Postgres CDC connector (via Flink CDC) and the Iceberg connector. This allows for direct data transfer from Postgres to Iceberg.

Alternatively, Apache Kafka can be utilized as an intermediary: capturing CDC data as topics, which are then consumed and written to Iceberg. This approach is necessary for Kafka Connect and Spark. However, in the case of Flink, it is possible to bypass Kafka and write data directly.

The Iceberg connector documentation recommends using Flink SQL to define tables, though this approach has its drawbacks:

  • All table schemas must be explicitly defined.
  • Each Postgres-to-Iceberg table combination necessitates a separate SQL statement, resulting in considerable boilerplate code. While Statement Sets can execute these within the same Flink pipeline, it still adds complexity.

Moreover, each table source requires a dedicated replication slot, which is inefficient and does not scale well.

However, the Iceberg connector features a powerful capability known as Dynamic Sink. This allows for a single sink to be registered, dynamically routing data to various tables while managing table registration and schema evolution.

DynamicRecord record = new DynamicRecord(
    TableIdentifier.of(glueDatabase, tableName),
    "main",
    tableDef.schema(),
    rowData,
    PartitionSpec.unpartitioned(),
    distributionMode,
    WRITE_PARALLELISM
);

This construct wraps Flink’s RowData with additional metadata for routing purposes.

It’s also important to note that the Flink Iceberg sink only flushes data during a checkpoint, making the checkpoint interval a critical factor in controlling flush size.

Results

I initiated a Flink job with upserting enabled, but without any tuning, and it took approximately 3.5 hours to complete.

Upon investigation, the issues identified in the previous blog post were evident: even with an efficient Iceberg sink, a slow CDC source significantly hampers overall throughput.

To address this, I increased the fetch size and split size to 5000 (from 1024) and 50000 (from 8096), respectively. Further increases led to out-of-memory issues, as these settings dictate the speed of data retrieval from the underlying database.

I also extended the Flink checkpoint interval to 5 minutes.

Despite tuning efforts, adjustments to the Iceberg sink (such as changing distribution mode) did not yield improvements.

The optimized version concluded in just under two hours. The BytesUploaded metric for the underlying S3 bucket is as follows:

CPU and memory usage were elevated but remained manageable:

Parquet file sizes varied significantly:

Controlling these sizes proved challenging; while there is a “write.target-file-size-bytes” configuration, file creation appears primarily governed by the checkpoint interval.

Switching to append-only mode and disabling upserting resulted in a job completion time of 1.5 hours.

Produced Parquet files exhibited much greater uniformity:

Kafka Connect

Kafka Connect supports the Postgres CDC connector (via Debezium) and the Iceberg connector. However, it relies on Kafka as an intermediary layer.

The Iceberg connector configuration utilized was as follows:

topics: debezium.public.lineitem, # the rest of the topics

# Transform Debezium envelope into CDC format and set _cdc.target for routing
transforms: debezium
transforms.debezium.type: org.apache.iceberg.connect.transforms.DebeziumTransform
transforms.debezium.cdc.target.pattern: ${env:ICEBERG_GLUE_DATABASE}.{table}

# Catalog (AWS Glue)
iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.warehouse: s3://${env:ICEBERG_S3_BUCKET}/
iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO

# Table settings
iceberg.tables.dynamic-enabled: true
iceberg.tables.route-field: _cdc.target
iceberg.tables.auto-create-enabled: true

# Commit coordination
iceberg.control.topic: iceberg-control
iceberg.control.commit.interval-ms: 300000

The “dynamic-enabled” and “route-field” options facilitate behavior akin to the Dynamic Sink in Flink, allowing for automatic routing of data to different tables, thanks to the additional metadata provided by DebeziumTransform.

The commit interval was set to 5 minutes, aligning with the Flink checkpointing interval.

Currently, the connector does not support upserting.

Results

The Kafka Connect connectors completed their tasks in approximately two hours. The BytesUploaded metric for the underlying S3 bucket is as follows:

CPU usage was notably high throughout the duration:

While the CPU bottleneck raised concerns, profiling did not reveal any anomalies:

The distribution of CPU usage appeared balanced between the source (CDC processing, JSON serialization) and the sink (JSON deserialization, Parquet writing).

The underlying Parquet sizes were well distributed:

Spark

Apache Spark lacks a dedicated CDC connector like Flink or Kafka Connect. Consequently, to establish a complete Postgres-to-Iceberg pipeline, I employed Kafka Connect with the Debezium connector to capture Postgres tables as Kafka topics. The Spark job then consumed this data and wrote it to Iceberg. Spark has robust support for Iceberg, having been one of the first tools to adopt it.

Unfortunately, Spark does not support the Dynamic Sink concept (as seen in Flink) or field-based routing (as in Kafka Connect). When writing to multiple sinks, the following options are available:

  • foreachBatch: a specialized operator providing precise control over batch writes. While many examples exist, Databricks advises against using it for writing to multiple sinks.
  • Query per table approach: while not a true routing solution, it serves as a practical workaround. This involves iterating over the list of input topics during startup and initiating a streaming query for each table.

The query per table method generates a series of queries similar to the following:

Dataset raw = spark.readStream()
    .format("kafka")
    .options(kafkaOptions)
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load();

Dataset result = … // data parsing and processing

result.writeStream()
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(triggerIntervalMs))
    .option("checkpointLocation", checkpointBase + "/" + table.name())
    .toTable(fullTableName);

Results

I concentrated on load-testing the Spark component of the job, specifically from Kafka to Iceberg.

Initially, I employed the foreachBatch approach, which resulted in a runtime exceeding 4 hours.

Subsequently, I transitioned to the query-per-table approach and began refining the configuration:

  • Enhanced Kafka Consumer settings (max.poll.records, fetch.max.bytes, etc.) to minimize the overhead associated with frequent polling when retrieving data from Kafka.
  • Adjusted maxOffsetsPerTrigger. Setting it too low results in numerous small files (incurring extra overhead), while setting it too high leads to a single batch for the entire topic partition.
  • Optimized triggers, batch caching, parsing and filtering, memory allocation, and shuffle partitions.

Despite these efforts, the most efficient run concluded in 3 hours and 20 minutes. The BytesUploaded metric for the underlying S3 bucket is as follows:

The executor CPU usage remained relatively low:

This performance level can be attributed to Spark’s architecture, which is designed for large-scale infrastructures. It struggled on a single executor with limited cores, as eight independent queries (one for each topic) competed for four task slots, each incurring its own checkpoint and Iceberg commit overhead.

The underlying Parquet sizes were mostly uniform, primarily influenced by maxOffsetsPerTrigger:

Data correctness

Validation confirmed that all tools accurately synchronized data without loss or duplication, with all table counts matching between Postgres and Iceberg.

“Supermetal achieves at least 7x faster snapshotting performance without any tuning. I attribute this primarily to a very fast CDC source and low serialization/deserialization overhead.”

Spot checks of the actual data revealed only minor discrepancies in column order and additional metadata columns.

Summary

The final comparison of the best test runs is as follows:

Supermetal demonstrated a remarkable 7x faster snapshotting performance without requiring tuning, largely due to its efficient CDC source and minimal serialization/deserialization overhead. The impact of a slower CDC source was evident in the performance of Flink and Debezium, as highlighted in the previous benchmark.

Additionally, Supermetal’s ability to differentiate between snapshotting and live CDC phases end-to-end allows for sink-level optimizations, such as utilizing append-only mode and a target file size during snapshotting, while switching to merge-on-read mode and a time-based interval during the live CDC phase.

Flink achieved the next best performance, albeit with significant tuning requirements. Both Flink and Kafka Connect provide mechanisms for dynamically routing source data into multiple Iceberg tables, whereas Spark necessitates separate queries for each table.

It is important to note that this test concentrated on single-node performance. While many other tools can scale horizontally (all Iceberg sinks; currently, only Flink offers this capability on the CDC side), such scaling can quickly become cost-prohibitive. Supermetal also allows for some horizontal scaling, for instance, by using a single table as the unit of scaling.

Tech Optimizer
Postgres to Iceberg in 13 minutes: How Supermetal compares to Flink, Kafka Connect, and Spark