# Подключение и работа с кластером Kafka (тариф Full) Раздел содержит описание способов подключения к управлению кластером Kafka. Поддерживаются два типа подключения: - `SASL_PLAINTEXT` - незашифрованное; - `SASL_SSL` - зашифрованное (через SSL-сертификат). Также в разделе приведены примеры работы с клиентскими утилитами и базовые операции администрирования. ## Предварительные требования Для начала работы с кластером Kafka необходимо выполнить предварительную настройку: 1. **Установить клиент Kafka** - скачивается дистрибутив Kafka (например, kafka_2.13-4.2.0) и распаковывается в удобную директорию; 2. **Установить среду выполнения Java** - для работы необходимо установить среду выполнения, с помощью команды: `**sudo apt install openjdk-17-jre-headless**`. Рекомендуется использовать версию OpenJDK 17 или выше; 3. **Доступ к брокерам** - адреса брокеров (bootstrap servers) предоставляются после развертывания кластера; 4. **Учетные данные**: - Имя пользователя (username) - стандартное значение client; - Пароль - предоставляется после развертывания кластера. 5. **Корневой сертификат (для SSL-подключения)** - сертификат для зашифрованного подключения предоставляется файлом при заказе сервиса Kafka. ## Структура клиентского дистрибутива  После распаковки архива Kafka в директории bin доступны скрипты, например: | Скрипт | Назначение | |---------------------------|--------------------------| |`kafka-console-producer.sh`|Отправка сообщений в топик| |`kafka-console-consumer.sh`|Чтение сообщений из топика| В разделе ниже приведены примеры использования скриптов. Подробное описание операций с данными и другими скриптами приведено в [официальной документации Kafka](https://kafka.apache.org/42/getting-started/introduction/). Дополнительную информацию можно получить, выполнив любой скрипт с флагом --help: ``` ./kafka-console-producer.sh ``` ## Конфигурационные файлы клиента Для подключения к Kafka используется файл настроек **client.properties**, который содержит параметры аутентификации, протокола и его механизмов (рекомендуются SCRAM_SHA_512 и SCRAM_SHA_256). ### Незашифрованное подключение (SASL_PLAINTEXT) Создается файл client.properties со следующим содержимым: ``` sasl.mechanism=SCRAM-SHA-512 security.protocol=SASLPLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \     username="вашлогин" \     password="вашпароль"; ``` ::: warning Примечание Для незашифрованного подключения (SASL_PLAINTEXT) используется порт 9091; ::: ### Зашифрованное подключение (SASL_SSL) Для зашифрованного подключения (в production-средах рекомендуется использовать именно его) дополнительно требуется корневой сертификат и хранилище доверенных сертификатов (truststore). ::: warning Примечание Для зашифрованного подключения (SASL_SSL) используется порт 9092. ::: ### Шаг 1. Создание truststore С помощью утилиты **keytool** выполняется импорт сертификата в хранилище: ``` keytool -importcert -storetype PKCS12 -keystore /путь/к/трастстору/truststore.jks -alias myalias -file ca.crt -storepass любойвашпароль -keypass любойвашпароль ``` Параметры: - **keystore** - путь к создаваемому хранилищу; - **alias** - псевдоним сертификата в хранилище; - **file** - путь к загруженному корневому сертификату; - **storepass** - пароль для доступа к хранилищу; - **keypass** - пароль для доступа к ключу. ### Шаг 2. Настройка client.properties В файл **client.properties** добавляются параметры SSL: ``` sasl.mechanism=SCRAM-SHA-512 security.protocol=SASLSSL sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \     username="вашлогин" \     password="вашпароль"; ssl.truststore.location=/путь/к/трастстору/truststore.jks ssl.truststore.password=парольотхранилища ``` ### Отправка сообщения в топик Выполняется команда с указанием брокеров, файла конфигурации и топика: ``` ./kafka-console-producer.sh --bootstrap-server :9092,:9092,:9092 --command-config~/client.properties --topic test-topic ``` Параметры: - **bootstrap-server** - список брокеров кластера (адреса и порты); - **command-config** - путь к файлу с настройками клиента; После выполнения команды сообщения вводятся построчно. Для завершения используется сочетание клавиш Ctrl+C. ### Чтение сообщений из топика Выполняется команда с указанием топика и, при необходимости, consumer group: ``` ./kafka-console-consumer.sh --bootstrap-server :9092,:9092,:9092 --command-config~/client.properties --topic test-topic --from-beginning --group my-new-consumer-group ``` Параметры: - **from-beginning** - чтение всех сообщений с начала (при отсутствии параметра чтение начинается с новых сообщений); - **group** - название consumer group, которая при её указании создастся автоматически. ## Администрирование кластера ::: warning Примечание Для изменения параметров кластера Kafka (CPU, RAM, DISK, количества брокеров, параметров топиков) необходимо направить запрос в **Service Desk** по адресу `servicedesk@datafort.ru` с указанием требуемых значений. ::: ### Изменение количества брокеров Выполняется также через обращение в техническую поддержку. Брокеров в кластере рекомендуется поддерживать **нечетным** для корректной работы механизмов выборов лидера. ::: warning Примечание Операция перераспределения партиций требует времени и может создавать дополнительную нагрузку на кластер. Выполнение рекомендуется в период наименьшей активности. :::