Skip to main content

CDC Pipeline

The CDC pipeline synchronises every data mutation from PostgreSQL (OLTP) to ClickHouse (OLAP) in near-real-time. Analytical queries never touch the transactional database.

Latency guarantee: < 5 seconds end-to-end under normal load. Delivery guarantee: at-least-once (duplicates resolved in ClickHouse via ReplacingMergeTree).


Pipeline Overview

PostgreSQL 17 Kafka ClickHouse
(OLTP / WAL) Debezium platform.data.events (ReplacingMergeTree)
┌───────────┐ ┌───────┐ ┌─────────────────┐ ┌──────────────────┐
│ WAL slot │──▶│ CDC │──▶│ Partition key: │──▶│ Engine: │
│ (logical │ │ Conn. │ │ entity_id │ │ ReplacingMerge │
│ replica) │ │ │ │ Retention: 7d │ │ Tree(updated_at) │
└───────────┘ └───────┘ └─────────────────┘ └──────────────────┘
▲ │ │
│ WAL slot heartbeat │ Consumer offset │
└────────────────────────────┘ │

SELECT ... FINAL
(dedup at read time)

Flow Step-by-Step


Deduplication Strategy

ClickHouse does not natively prevent duplicate inserts from at-least-once Kafka delivery. Platform-Kernel uses a two-tier approach:

LayerMechanismWhen it applies
Write-timeIdempotent INSERT — consumer uses upsert semanticsKafka consumer restart
Merge-timeReplacingMergeTree(updated_at) — keeps row with max updated_atBackground merge
Read-timeSELECT ... FINAL — collapses duplicates in queryAll analytical reads

Always use SELECT ... FINAL in ClickHouse queries against CDC tables to guarantee consistent (deduplicated) results. Without FINAL, ClickHouse may return both the original and the replacement row before the background merge has completed.


ClickHouse Table Design

-- Example: tenant data analytics table
CREATE TABLE analytics.data_records
(
tenant_id UUID,
record_id UUID,
module_id String,
model_name String,
payload String, -- JSON blob
updated_at DateTime64(3),
is_deleted UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY (toYYYYMM(updated_at), tenant_id)
ORDER BY (tenant_id, record_id);

Key design decisions:

DecisionRationale
PARTITION BY (toYYYYMM, tenant_id)Prune by month AND tenant — fast WHERE tenant_id = ?
ORDER BY (tenant_id, record_id)Sorting key = dedup key for ReplacingMergeTree
ReplacingMergeTree(updated_at)Keeps highest updated_at on merge
is_deleted = 1Soft delete via CDC op: "d" — data retained for audit

WAL Retention and Bloat Protection

PostgreSQL WAL Slot Configuration
──────────────────────────────────────────────────────────
wal_level = logical (required for Debezium)
wal_keep_size = 1024 MB (minimum 1 GB buffer)
max_slot_wal_keep_size = 50 GB (PostgreSQL 13+)
└─ If Debezium is offline for too long and the slot grows
beyond 50 GB → PostgreSQL invalidates the slot
→ Debezium triggers a FULL SNAPSHOT (re-sync)
→ Alert fires BEFORE this threshold is reached
──────────────────────────────────────────────────────────

Alert thresholds (OpenTelemetry → VictoriaMetrics):
pg_replication_slot_wal_bytes > 10 GB → CRITICAL alert
→ Notify Service: channel system.health
→ PagerDuty (if configured)

Rationale: preventing unbounded WAL growth avoids the most
dangerous failure mode — PostgreSQL disk full → crash →
full cluster downtime.

Recovery Policy

ScenarioRecovery actionData loss?
Debezium restart (< WAL retention)Resume from last Kafka offsetNone
WAL slot invalidated (> 50 GB bloat)Full auto-snapshot re-syncNone (idempotent)
ClickHouse downKafka buffers events (7-day retention)None
Debezium lost offsetFull snapshot → resume streamingNone

Eventual Consistency Disclosure

The CDC pipeline is not synchronous. Analytical data lags the transactional source by up to 5 seconds under normal conditions and longer during incident recovery.

The Data Layer SDK exposes this explicitly:

const result = await kernel.data().analytics({
model: "orders",
aggregate: { sum: "amount_cents" }
});

// result.meta.dataAsOf — ISO 8601 timestamp of last CDC sync
// result.data — the aggregated rows
console.log(result.meta.dataAsOf); // "2026-04-22T11:14:37Z"

UI Shell can render a freshness badge using meta.dataAsOf:

┌──────────────────────────────────┐
│ Revenue (today) │
│ $42,817 │
│ ⓘ Data as of 11:14:37 │
└──────────────────────────────────┘

The lastSyncAt value is the timestamp of the last CDC event that the ClickHouse consumer committed. It is stored per-tenant in Valkey (cdc:last_sync:{tenantId}) and refreshed on every successful consumer batch write.


Monitoring

MetricSourceAlert threshold
debezium_lag_secondsDebezium JMX → OTLP> 300s critical
pg_replication_slot_wal_bytesPostgreSQL pg_replication_slots> 10 GB critical
kafka_consumer_lag_recordsKafka consumer group metrics> 10 000 warning
clickhouse_insert_errors_totalClickHouse system tables> 0 warning

All metrics are exported via OpenTelemetry → VictoriaMetrics and visible in Grafana.


  • Data Flow — CDC sequence diagram in context of the full system
  • Service Map — Data Layer dependency on ClickHouse and Kafka
  • Deduplication — 3-layer dedup: PostgreSQL, Kafka, ClickHouse
  • Tenant Isolation — ClickHouse dual-layer row policy