# Подключение и работа с кластером Kafka (тариф Base) Раздел содержит описание способов подключения к управлению кластером Kafka. Поддерживаются два типа подключения: - `SASL_PLAINTEXT` - незашифрованное; - `SASL_SSL` - зашифрованное (через SSL-сертификат). Также в разделе приведены примеры работы с клиентскими утилитами и базовые операции администрирования. ## Предварительные требования Для начала работы с кластером Kafka необходимо выполнить предварительную настройку: 1. **Установить клиент Kafka** - скачивается дистрибутив Kafka (например, kafka_2.13-4.2.0) и распаковывается в удобную директорию; 2. **Установить среду выполнения Java** - для работы необходимо установить среду выполнения, с помощью команды: `**sudo apt install openjdk-17-jre-headless**`. Рекомендуется использовать версию OpenJDK 17 или выше; 3. **Доступ к брокерам** - адреса брокеров (bootstrap servers) предоставляются после развертывания кластера; 4. **Учетные данные**: - Имя пользователя (username) - стандартное значение client; - Пароль - предоставляется после развертывания кластера. 5. **Корневой сертификат (для SSL-подключения)** - сертификат для зашифрованного подключения предоставляется файлом при заказе сервиса Kafka. ## Структура клиентского дистрибутива После распаковки архива Kafka в директории bin доступны основные скрипты для управления: | Скрипт | Назначение | | --------------------------- | -------------------------------------------------- | | `kafka-topics.sh` | Управление топиками (создание, удаление, просмотр) | | `kafka-console-producer.sh` | Отправка сообщений в топик | | `kafka-console-consumer.sh` | Чтение сообщений из топика | | `kafka-consumer-groups.sh` | Управление консьюмер-группами | | `kafka-configs.sh` | Изменение конфигурации топиков и других объектов | | `kafka-acls.sh` | Управление списками доступа (ACL) | В разделе ниже приведены примеры использования скриптов. Подробное описание операций с данными и другими скриптами приведено в [официальной документации Kafka](https://kafka.apache.org/42/getting-started/introduction/). Дополнительную информацию можно получить, выполнив любой скрипт с флагом --help: ``` ./kafka-topics.sh --help ``` #### Конфигурационные файлы клиента Для подключения к Kafka используется файл настроек **client.properties**, который содержит параметры аутентификации, протокола и его механизмов (рекомендуются SCRAM_SHA_512 и SCRAM_SHA_256). ## Незашифрованное подключение (SASL_PLAINTEXT) Создается файл **client.properties** со следующим содержимым: ``` sasl.mechanism=SCRAM-SHA-512 security.protocol=SASLPLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \     username="client" \     password="вашпароль"; ``` ::: warning Примечание Для незашифрованного подключения (SASL_PLAINTEXT) используется порт 9091; ::: ## Зашифрованное подключение (SASL_SSL) Для зашифрованного подключения (в production-средах рекомендуется использовать именно его) дополнительно требуется корневой сертификат и хранилище доверенных сертификатов (truststore). ::: warning Примечание Для зашифрованного подключения (SASL_SSL) используется порт 9092. ::: ### Шаг 1. Создание truststore С помощью утилиты **keytool** выполняется импорт сертификата в хранилище: ``` keytool -importcert -storetype PKCS12 -keystore /путь/к/трастстору/truststore.jks -alias myalias -file ca.crt -storepass любойвашпароль -keypass любойвашпароль ``` Параметры: - **keystore** - путь к создаваемому хранилищу; - **alias** - псевдоним сертификата в хранилище; - **file** - путь к загруженному корневому сертификату; - **storepass** - пароль для доступа к хранилищу; - **keypass** - пароль для доступа к ключу. ### Шаг 2. Настройка client.properties В файл **client.properties** добавляются параметры SSL: ``` sasl.mechanism=SCRAM-SHA-512 security.protocol=SASLSSL sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \     username="client" \     password="вашпароль"; ssl.truststore.location=/путь/к/трастстору/truststore.jks ssl.truststore.password=парольотхранилища ``` ## Работа с кластером Kafka ### Создание топиков ``` ./kafka-topics.sh --bootstrap-server :9092,:9092,:9092 --command-config ../config/client.properties --create --topic <имя топика> --partitions <количество партиций> --replication-factor <значение> --config min.insync.replicas=<значение> ``` **Ознакомиться с рекомендациями по настройке топиков можно в разделе [Настройка топиков](./base-tier-topics-guide.md).** По умолчанию топик создаётся с --partitions 10 --replication-factor 3 --config min.insync.replicas=2, поэтому, если нет необходимости создавать топик со специфическими настройками, эти флаги в команде можно не указывать. ### Просмотр списка топиков Выполняется команда с указанием брокеров и файла конфигурации: ``` ./kafka-topics.sh --bootstrap-server :9092,:9092,:9092 --command-config ~/client.properties --list ``` Параметры: - **bootstrap-server** - список брокеров кластера (адреса и порты); command-config - путь к файлу с настройками клиента; - **list** - вывод списка топиков. ### Отправка сообщения в топик Выполняется команда с указанием брокеров, файла конфигурации и топика: ``` ./kafka-console-producer.sh --bootstrap-server :9092,:9092,:9092 --command-config~/client.properties --topic test-topic ``` После выполнения команды сообщения вводятся построчно. Для завершения используется сочетание клавиш Ctrl+C. ### Чтение сообщений из топика Выполняется команда с указанием топика и, при необходимости, consumer group: ``` ./kafka-console-consumer.sh --bootstrap-server :9092,:9092,:9092 --command-config~/client.properties --topic test-topic --from-beginning --group my-new-consumer-group ``` Параметры: - **from-beginning**- чтение всех сообщений с начала (при отсутствии параметра чтение начинается с новых сообщений); - **group** - название consumer group, которая создастся автоматически. ## Управление пользователями и доступом ### Рекомендации к созданию пароля **Длина пароля (рекомендуемая):** - для учётной записи пользователя - не менее 12 знаков; - для учётных записей администраторов, технических и служебных учётных записей - не менее 16 знаков. **Сложность пароля:** рекомендуется использовать уникальный пароль, содержащий символы как минимум трёх из четырёх указанных ниже групп (при отсутствии технических ограничений): - буквы латинского алфавита в верхнем регистре (A-Z); - буквы латинского алфавита в нижнем регистре (a-z); - цифры (0-9); - специальные символы и знаки пунктуации (например, `!@#$%^&*(),.?`). **Периодичность смены:** рекомендуемая периодичность смены пароля - не реже одного раза в год. ### Создание пользователей Для удобства можно написать скрипт, который создает пользователя сразу для двух методов шифрования - SHA-256 и SHA-512: ``` #!/bin/bash username="имя_пользователя" password="сгенерированный_пароль" bootstrap=":9092,:9092,:9092" /opt/kafka/bin/kafka-configs.sh \           --bootstrap-server $bootstrap \           --alter \           --add-config SCRAM-SHA-256=[password="$password"] \           --command-config /opt/kafka/config/client.properties \           --entity-type users \           --entity-name $username /opt/kafka/bin/kafka-configs.sh \           --bootstrap-server $bootstrap \           --alter \           --add-config SCRAM-SHA-512=[password="$password"] \           --command-config /opt/kafka/config/client.properties \           --entity-type users \           --entity-name $username ``` **Запуск скрипта:** ``` sudo ./kafka-user-add.sh ``` ### Просмотр данных о пользователе Для просмотра информации о существующих пользователях выполняется команда: ``` sudo ./kafka-console-consumer.sh --bootstrap-server :9092 --describe --command-config ../config/client.properties --entity-type users ``` ### Управление consumer groups Consumer Group создается автоматически при обращении к ней. Например, при чтении сообщений из топика с указанием этой consumer group: ``` sudo ./kafka-console-consumer.sh --bootstrap-server :9092 --command-config ../config/client.properties --topic test-topic --group test-group --from-beginning ``` ### Просмотр списка ACL Действия с ACL из командной строки осуществляются скриптом kafka-acls.sh. Пример просмотра списка ACL: ``` sudo ./kafka-acls.sh --bootstrap-server :9092,:9092,:9092 --command-config ../config/client.properties --list ``` ## Администрирование кластера #### Изменение объема RAM Для изменения объема оперативной памяти, выделенной под Kafka, направляется обращение в техническую поддержку с указанием требуемого объема RAM. #### Изменение количества брокеров Выполняется также через обращение в техническую поддержку. Брокеров в кластере рекомендуется поддерживать **нечетным** для корректной работы механизмов выборов лидера. После добавления брокера выполняется перераспределение существующих партиций с учетом нового узла с помощью утилиты **kafka-reassign-partitions.sh:** ``` ./kafka-reassign-partitions.sh --bootstrap-server <адреса_брокеров> --generate --topics-to-move-json-file topics.json --broker-list "0,1,2" --execute ``` ::: warning Примечание Операция перераспределения партиций требует времени и может создавать дополнительную нагрузку на кластер. Выполнение рекомендуется в период наименьшей активности. :::