Табличный движок Kafka
Если вы используете ClickHouse Cloud, мы рекомендуем вместо этого использовать ClickPipes. ClickPipes нативно поддерживает приватные сетевые соединения, независимое масштабирование ресурсов для приёма данных и ресурсов кластера, а также всесторонний мониторинг при потоковой загрузке данных из Kafka в ClickHouse.
- Публиковать или подписываться на потоки данных.
- Организовывать отказоустойчивое хранилище.
- Обрабатывать потоки по мере их поступления.
Создание таблицы
Обязательные параметры:
kafka_broker_list— список брокеров, разделённых запятыми (например,localhost:9092).kafka_topic_list— список топиков Kafka.kafka_group_name— группа потребителей Kafka. Смещения чтения отслеживаются отдельно для каждой группы. Если вы не хотите дублирования сообщений в кластере, используйте одно и то же имя группы везде.kafka_format— формат сообщений. Использует ту же нотацию, что и SQL-функцияFORMAT, например,JSONEachRow. Для получения дополнительной информации см. раздел Formats.
Необязательные параметры:
kafka_security_protocol— Протокол, используемый для связи с брокерами. Возможные значения:plaintext,ssl,sasl_plaintext,sasl_ssl.kafka_sasl_mechanism— Механизм SASL, используемый для аутентификации. Возможные значения:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username— Имя пользователя SASL для использования с механизмамиPLAINиSASL-SCRAM-...kafka_sasl_password— Пароль SASL для использования с механизмамиPLAINиSASL-SCRAM-...kafka_schema— Параметр, который должен использоваться, если формат требует определения схемы. Например, Cap'n Proto требует путь к файлу схемы и имя корневого объектаschema.capnp:Message.kafka_schema_registry_skip_bytes— Количество байт, которые нужно пропустить с начала каждого сообщения при использовании реестра схем с envelope-заголовками (например, AWS Glue Schema Registry, который добавляет 19-байтовый envelope). Диапазон:[0, 255]. По умолчанию:0.kafka_num_consumers— Количество консьюмеров на таблицу. Укажите больше консьюмеров, если пропускной способности одного консьюмера недостаточно. Общее количество консьюмеров не должно превышать количество партиций в топике, так как к каждой партиции может быть привязан только один консьюмер, и не должно быть больше числа физических ядер на сервере, где развернут ClickHouse. По умолчанию:1.kafka_max_block_size— Максимальный размер пакета (в сообщениях) для одного poll. По умолчанию: max_insert_block_size.kafka_skip_broken_messages— Допустимое количество несовместимых со схемой сообщений Kafka на блок для парсера. Еслиkafka_skip_broken_messages = N, то движок пропускает N сообщений Kafka, которые не могут быть разобраны (одно сообщение соответствует одной строке данных). По умолчанию:0.kafka_commit_every_batch— Фиксировать (commit) каждый считанный и обработанный пакет вместо одного коммита после записи всего блока. По умолчанию:0.kafka_client_id— Идентификатор клиента. По умолчанию пустая строка.kafka_poll_timeout_ms— Таймаут для одного poll из Kafka. По умолчанию: stream_poll_timeout_ms.kafka_poll_max_batch_size— Максимальное количество сообщений, извлекаемых за один poll из Kafka. По умолчанию: max_block_size.kafka_flush_interval_ms— Таймаут для сброса данных из Kafka. По умолчанию: stream_flush_interval_ms.kafka_consumer_reschedule_ms— Интервал повторного планирования, когда обработка потока Kafka застопорилась (например, когда нет доступных для чтения сообщений). Этот параметр управляет задержкой перед повторной попыткой опроса консьюмером. Не должен превышатьkafka_consumers_pool_ttl_ms. По умолчанию:500миллисекунд.kafka_thread_per_consumer— Выделять отдельный поток для каждого консьюмера. Если включено, каждый консьюмер сбрасывает данные независимо и параллельно (в противном случае строки от нескольких консьюмеров объединяются в один блок). По умолчанию:0.kafka_handle_error_mode— Способ обработки ошибок для движка Kafka. Возможные значения:default(будет выброшено исключение, если не удалось разобрать сообщение),stream(сообщение об ошибке и исходное сообщение будут сохранены в виртуальных столбцах_errorи_raw_message),dead_letter_queue(данные, связанные с ошибкой, будут сохранены вsystem.dead_letter_queue).kafka_commit_on_select— Фиксировать сообщения при выполнении запросаSELECT. По умолчанию:false.kafka_max_rows_per_message— Максимальное количество строк, записываемых в одно сообщение Kafka для форматов, основанных на строках. По умолчанию:1.kafka_compression_codec— Кодек сжатия, используемый при публикации сообщений. Поддерживаются: пустая строка,none,gzip,snappy,lz4,zstd. В случае пустой строки кодек сжатия не задаётся таблицей, поэтому будут использованы значения из конфигурационных файлов или значение по умолчанию изlibrdkafka. По умолчанию: пустая строка.kafka_compression_level— Параметр уровня сжатия для алгоритма, выбранного вkafka_compression_codec. Более высокие значения обеспечивают лучшее сжатие за счёт большего использования CPU. Допустимый диапазон зависит от алгоритма:[0-9]дляgzip;[0-12]дляlz4; только0дляsnappy;[0-12]дляzstd;-1= уровень сжатия по умолчанию для конкретного кодека. По умолчанию:-1.
Примеры:
Устаревший способ создания таблицы
Не используйте этот способ в новых проектах. По возможности переведите старые проекты на метод, описанный выше.
Табличный движок Kafka не поддерживает столбцы со значениями по умолчанию. Если вам нужны такие столбцы, их можно добавить на уровне materialized view (см. ниже).
Описание
Доставленные сообщения отслеживаются автоматически, поэтому каждое сообщение в группе учитывается только один раз. Если вам нужно получить данные дважды, создайте копию таблицы с другим именем группы.
Группы гибкие и синхронизируются в кластере. Например, если у вас есть 10 топиков и 5 копий таблицы в кластере, то каждая копия получает по 2 топика. Если количество копий меняется, топики автоматически перераспределяются между копиями. Подробнее об этом читайте на http://kafka.apache.org/intro.
Рекомендуется, чтобы у каждого топика Kafka была своя собственная группа потребителей, что обеспечивает эксклюзивную связь между топиком и группой, особенно в средах, где топики могут динамически создаваться и удаляться (например, в тестовой или промежуточной среде).
SELECT мало полезен для чтения сообщений (кроме отладки), потому что каждое сообщение может быть прочитано только один раз. На практике удобнее создавать потоки в реальном времени с помощью материализованных представлений. Для этого:
- Используйте движок для создания Kafka-потребителя и рассматривайте его как поток данных.
- Создайте таблицу с нужной структурой.
- Создайте материализованное представление, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW привязан к движку, он начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из Kafka и преобразовывать их в требуемый формат с помощью SELECT.
Одна таблица Kafka может иметь любое количество материализованных представлений: они не читают данные из таблицы Kafka напрямую, а получают новые записи (блоками). Таким образом, можно записывать данные в несколько таблиц с разной степенью детализации (с группировкой — агрегацией и без).
Пример:
Чтобы повысить производительность, полученные сообщения группируются в блоки размером max_insert_block_size. Если в течение stream_flush_interval_ms миллисекунд блок не был сформирован, данные будут принудительно записаны в таблицу независимо от полноты блока.
Чтобы прекратить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:
Если вы хотите изменить целевую таблицу с помощью ALTER, рекомендуем отключить материализованное представление, чтобы избежать расхождений между целевой таблицей и данными из представления.
Конфигурация
Как и движок GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с использованием файла конфигурации ClickHouse. Вы можете использовать два ключа конфигурации: глобальный (в секции <kafka>) и на уровне топика (в секции <kafka><kafka_topic>). Сначала применяется глобальная конфигурация, а затем — конфигурация для конкретного топика (если она задана).
Список доступных параметров конфигурации приведён в справочнике по конфигурации librdkafka. Используйте символ подчёркивания (_) вместо точки в конфигурации ClickHouse. Например, check.crcs=true будет записано как <check_crcs>true</check_crcs>.
Поддержка Kerberos
Для работы с Kafka с поддержкой Kerberos добавьте дочерний элемент security_protocol со значением sasl_plaintext. Этого достаточно при наличии действительного Kerberos ticket-granting ticket (TGT), уже полученного и закэшированного средствами операционной системы.
ClickHouse может самостоятельно поддерживать учетные данные Kerberos с использованием файла keytab. Для этого используйте дочерние элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal.
Пример:
Виртуальные столбцы
_topic— топик Kafka. Тип данных:LowCardinality(String)._key— ключ сообщения. Тип данных:String._offset— смещение сообщения. Тип данных:UInt64._timestamp— временная метка сообщения. Тип данных:Nullable(DateTime)._timestamp_ms— временная метка сообщения в миллисекундах. Тип данных:Nullable(DateTime64(3))._partition— партиция топика Kafka. Тип данных:UInt64._headers.name— массив ключей заголовков сообщения. Тип данных:Array(String)._headers.value— массив значений заголовков сообщения. Тип данных:Array(String).
Дополнительные виртуальные столбцы при kafka_handle_error_mode='stream':
_raw_message— исходное сообщение, которое не удалось разобрать. Тип данных:String._error— текст исключения, возникшего при неуспешном разборе. Тип данных:String.
Примечание: виртуальные столбцы _raw_message и _error заполняются только в случае возникновения исключения во время разбора; при успешном разборе сообщения они всегда пусты.
Поддержка форматов данных
Движок Kafka поддерживает все форматы, поддерживаемые в ClickHouse. Количество строк в одном сообщении Kafka зависит от того, является ли формат построчным или блочным:
- Для построчных форматов количество строк в одном сообщении Kafka можно контролировать с помощью настройки
kafka_max_rows_per_message. - Для блочных форматов мы не можем разделить блок на более мелкие части, но количество строк в одном блоке можно контролировать с помощью общего параметра настройки max_block_size.
Движок для хранения зафиксированных смещений в ClickHouse Keeper
Если включён параметр allow_experimental_kafka_offsets_storage_in_keeper, для табличного движка Kafka можно задать ещё два параметра:
kafka_keeper_pathзадаёт путь к таблице в ClickHouse Keeperkafka_replica_nameзадаёт имя реплики в ClickHouse Keeper
Оба параметра должны быть либо заданы вместе, либо оба опущены. Если оба параметра заданы, используется новый, экспериментальный движок Kafka. Новый движок не зависит от хранения зафиксированных смещений в Kafka и хранит их в ClickHouse Keeper. Он по-прежнему пытается зафиксировать смещения в Kafka, но опирается на эти смещения только при создании таблицы. Во всех остальных ситуациях (перезапуск таблицы или восстановление после ошибки) в качестве смещения, с которого продолжается потребление сообщений, будут использоваться смещения, сохранённые в ClickHouse Keeper. Помимо зафиксированного смещения, он также сохраняет, сколько сообщений было прочитано в последнем пакете, поэтому, если операция вставки завершилась с ошибкой, будет прочитано то же количество сообщений, что позволяет при необходимости включить дедупликацию.
Пример:
Известные ограничения
Поскольку новый движок является экспериментальным, он ещё не готов для промышленного использования. На данный момент в реализации есть несколько известных ограничений:
- Наибольшее ограничение заключается в том, что движок не поддерживает прямое чтение. Чтение из движка с использованием материализованных представлений и запись в движок работают, но прямое чтение — нет. В результате все прямые запросы
SELECTбудут завершаться с ошибкой. - Быстрое удаление и пересоздание таблицы или указание одного и того же пути ClickHouse Keeper для разных движков может вызывать проблемы. В качестве рекомендуемой практики можно использовать
{uuid}вkafka_keeper_path, чтобы избежать конфликтов путей. - Для обеспечения повторяемости чтения сообщения не могут потребляться из нескольких партиций в одном потоке. С другой стороны, потребители Kafka должны регулярно опрашиваться, чтобы оставаться «живыми». В результате этих двух требований мы решили разрешать создание нескольких потребителей только в том случае, если включён
kafka_thread_per_consumer, в противном случае слишком сложно избежать проблем, связанных с регулярным опросом потребителей. - Потребители, создаваемые новым движком хранения, не отображаются в таблице
system.kafka_consumers.
См. также