ClickHouseSeniorSystem design

Как ClickHouse интегрируется с Kafka для приёма данных в реальном времени?

ClickHouse читает Kafka через встроенный движок Kafka (Kafka engine table), который в связке с Materialized View непрерывно опрашивает топик и записывает батчи в целевую MergeTree-таблицу; число consumer-потоков ограничено числом партиций топика, а offsets хранятся в Kafka consumer group.

Интеграция ClickHouse с Kafka

ClickHouse имеет встроенный движок таблиц Kafka, который работает как consumer: читает сообщения из топиков, конвертирует их в строки и передаёт в материализованное представление (Materialized View), которое пишет в целевую MergeTree-таблицу. Это стандартный паттерн приёма данных в реальном времени без внешних сервисов (Kafka Connect, Flink и др.).

Архитектура потока данных

  • Kafka topicKafka engine table (читает сообщения) → Materialized View (трансформирует) → MergeTree table (хранит)
  • Kafka-таблица является виртуальной: SELECT из неё потребляет сообщения и не позволяет читать их повторно.
  • Offsets хранятся в Kafka (consumer group), а не в ClickHouse.

Шаг 1: Целевая таблица хранения

CREATE TABLE events_raw
(
    event_time  DateTime,
    user_id     UInt64,
    event_type  String,
    payload     String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time);

Шаг 2: Kafka engine table

CREATE TABLE events_kafka
(
    event_time  DateTime,
    user_id     UInt64,
    event_type  String,
    payload     String
)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4,
    kafka_max_block_size = 65536,
    kafka_poll_timeout_ms = 500;

Шаг 3: Materialized View

CREATE MATERIALIZED VIEW events_mv
TO events_raw
AS
SELECT
    event_time,
    user_id,
    event_type,
    payload
FROM events_kafka;

Поддерживаемые форматы

  • JSONEachRow — один JSON-объект на строку (NDJSON)
  • AvroConfluent — Avro с Schema Registry
  • Protobuf — требует .proto файл в format_schema_path
  • CSV, TSV, RawBLOB и др.

Мониторинг потребления

-- Статус consumer threads
SELECT *
FROM system.kafka_consumers
WHERE database = 'default' AND table = 'events_kafka';

-- Метрики из system.metrics
SELECT metric, value
FROM system.metrics
WHERE metric LIKE 'Kafka%';

Настройка для высокой пропускной способности

-- kafka_num_consumers должен быть <= числу партиций в топике
-- kafka_max_block_size определяет размер батча (строк) перед записью в MV
ALTER TABLE events_kafka MODIFY SETTING
    kafka_num_consumers = 8,
    kafka_max_block_size = 100000,
    kafka_flush_interval_ms = 7500;

Схема с dead-letter queue

-- Добавляем вспомогательную таблицу для ошибок парсинга
CREATE TABLE events_errors
(
    raw_message String,
    error       String,
    received_at DateTime DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY received_at;

-- В MV используем _error virtual column (ClickHouse 23.9+)
CREATE MATERIALIZED VIEW events_mv_errors
TO events_errors
AS
SELECT
    _raw_message AS raw_message,
    _parse_error AS error
FROM events_kafka
WHERE _parse_error != '';

Подводные камни

  • Kafka-таблица не поддерживает транзакционную семантику exactly-once «из коробки» — при сбое ClickHouse до commit offset возможны дубликаты; используйте ReplacingMergeTree или дедупликацию по уникальному полю.
  • kafka_num_consumers не может превышать число партиций топика — лишние consumer threads простаивают, увеличивая load без пользы.
  • DROP TABLE events_kafka немедленно сбрасывает consumer group offset — при пересоздании таблица начнёт читать с latest (или earliest по настройке), что ведёт к пропуску или дублированию данных.
  • Materialized View и Kafka-таблица должны находиться в одной базе данных — межбазовый MV с Kafka не работает корректно.
  • SELECT из Kafka-таблицы потребляет сообщения безвозвратно — никогда не делайте вручную SELECT для отладки в production; используйте system.kafka_consumers.
  • При изменении схемы Kafka-таблицы (ALTER) нужно пересоздавать MV — ALTER не обновляет MV автоматически.
  • Kafka engine в ClickHouse Cloud имеет ограничения на версию librdkafka и не поддерживает все настройки kafka_* — часть настроек игнорируется молча.
  • При высокой latency сети между ClickHouse и Kafka broker kafka_poll_timeout_ms нужно увеличивать — иначе consumer threads тратят большую часть времени на ожидание.

Common mistakes

  • Объяснять kafka integration как OLTP-механику row-store базы вместо аналитической колоночной модели ClickHouse.
  • Путать primary key ClickHouse с уникальным constraint из PostgreSQL или MySQL.
  • Игнорировать parts, merges, ORDER BY, sparse index и стоимость маленьких вставок.
  • Предлагать синтаксис или транзакционное поведение, которого в ClickHouse нет.

What the interviewer is testing

  • Кандидат объясняет kafka integration через реальный механизм ClickHouse, а не общими словами.
  • Приводит корректный SQL или диагностический запрос для этой СУБД.
  • Называет ограничения, версионные отличия или эксплуатационные последствия.
  • Связывает ответ с проектированием приложения, производительностью, надежностью или безопасностью.

Sources

Related topics