215 lines
13 KiB
Markdown
215 lines
13 KiB
Markdown
|
|
# Подключение и работа с кластером Kafka (тариф Base)
|
|||
|
|
|
|||
|
|
## О разделе
|
|||
|
|
|
|||
|
|
В данном разделе описаны способы подключения к управляемому кластеру Kafka при тарифе Base. Рассматриваются два варианта: незашифрованное подключение (SASL_PLAINTEXT) и зашифрованное подключение с использованием SSL-сертификатов (SASL_SSL). Также приведены примеры работы с клиентскими утилитами и базовые операции администрирования.
|
|||
|
|
|
|||
|
|
## Предварительные требования
|
|||
|
|
|
|||
|
|
Для работы с кластером Kafka необходимо наличие следующих компонентов:
|
|||
|
|
1. **Установленный клиент Kafka** - скачивается дистрибутив Kafka (например, kafka_2.13-4.2.0) и распаковывается в удобную директорию;
|
|||
|
|
2. **Установленная Java** - для работы с Kafka требуется среда выполнения Java. Установка выполняется командой: **sudo apt install openjdk-17-jre-headless**. Рекомендуется использовать версию OpenJDK 17 или выше;
|
|||
|
|
3. **Доступ к брокерам** - адреса брокеров (bootstrap servers) предоставляются после развертывания кластера;
|
|||
|
|
4. **Учетные данные**: Вам передаются credentials пользователя "client", у которого есть возможность управления кластером.
|
|||
|
|
- Имя пользователя (username) - стандартное значение client;
|
|||
|
|
- Пароль - предоставляется после развертывания кластера.
|
|||
|
|
5. **Корневой сертификат (для SSL-подключения)** - сертификат для зашифрованного подключения предоставляется файлом при заказе сервиса Kafka.
|
|||
|
|
|
|||
|
|
## Структура клиентского дистрибутива
|
|||
|
|
|
|||
|
|
После распаковки архива Kafka в директории bin доступны основные скрипты для управления:
|
|||
|
|
|
|||
|
|
|Скрипт|Назначение|
|
|||
|
|
|---|---|
|
|||
|
|
|**kafka-topics.sh**|Управление топиками (создание, удаление, просмотр)|
|
|||
|
|
|**kafka-console-producer.sh**|Отправка сообщений в топик|
|
|||
|
|
|**kafka-console-consumer.sh**|Чтение сообщений из топика|
|
|||
|
|
|**kafka-consumer-groups.sh**|Управление консьюмер-группами|
|
|||
|
|
|**kafka-configs.sh**|Изменение конфигурации топиков и других объектов|
|
|||
|
|
|**kafka-acls.sh**|Управление списками доступа (ACL)|
|
|||
|
|
В разделе ниже приведены примеры использования скриптов.
|
|||
|
|
|
|||
|
|
Подробное описание операций с данными и другими скриптами приведено в [официальной документации Kafka](https://kafka.apache.org/42/operations/basic-kafka-operations/). Дополнительную информацию можно получить, выполнив любой скрипт с флагом --help:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-topics.sh --help
|
|||
|
|
```
|
|||
|
|
## Конфигурационные файлы клиента
|
|||
|
|
|
|||
|
|
Для подключения к Kafka используется файл настроек (client.properties), который содержит параметры аутентификации и протокола.
|
|||
|
|
|
|||
|
|
### Незашифрованное подключение (SASL_PLAINTEXT)
|
|||
|
|
|
|||
|
|
Создается файл **client.properties** со следующим содержимым:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sasl.mechanism=SCRAM-SHA-512
|
|||
|
|
security.protocol=SASLPLAINTEXT
|
|||
|
|
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
|||
|
|
username="client" \
|
|||
|
|
password="вашпароль";
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Зашифрованное подключение (SASL_SSL)
|
|||
|
|
|
|||
|
|
Для зашифрованного подключения дополнительно требуется корневой сертификат и хранилище доверенных сертификатов (truststore).
|
|||
|
|
|
|||
|
|
#### Шаг 1. Создание truststore
|
|||
|
|
|
|||
|
|
С помощью утилиты **keytool** выполняется импорт сертификата в хранилище:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
keytool -importcert -storetype PKCS12 -keystore /путь/к/трастстору/truststore.jks -alias myalias -file ca.crt -storepass _любойвашпароль -keypass любойвашпароль_
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Параметры:**
|
|||
|
|
- **keystore** - путь к создаваемому хранилищу;
|
|||
|
|
- **alias** - псевдоним сертификата в хранилище;
|
|||
|
|
- **file** - путь к загруженному корневому сертификату;
|
|||
|
|
- **storepass** - пароль для доступа к хранилищу;
|
|||
|
|
- **keypass** - пароль для доступа к ключу.
|
|||
|
|
|
|||
|
|
#### Шаг 2. Настройка client.properties
|
|||
|
|
|
|||
|
|
В файл **client.properties** добавляются параметры SSL:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sasl.mechanism=SCRAM-SHA-512
|
|||
|
|
security.protocol=SASLSSL
|
|||
|
|
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
|||
|
|
username="client" \
|
|||
|
|
password="вашпароль";
|
|||
|
|
ssl.truststore.location=/путь/к/трастстору/truststore.jks
|
|||
|
|
ssl.truststore.password=парольотхранилища
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## Примеры подключения
|
|||
|
|
|
|||
|
|
Для незашифрованного подключения (SASL_PLAINTEXT) используется порт 9091;
|
|||
|
|
|
|||
|
|
Для зашифрованного подключения (SASL_SSL) используется порт 9092.
|
|||
|
|
|
|||
|
|
### Создание топиков
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-topics.sh --bootstrap-server <IP машины 0>:9092,<IP машины 1>:9092,<IP машины X>:9092 --command-config ../config/client.properties --create --topic <имя топика> --partitions <количество партиций> --replication-factor <значение> --config min.insync.replicas=<значение>
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Ознакомиться с рекомендациями по настройке топиков можно здесь - [рекомендации](https://confluence.datafort.ru/spaces/OVPS/pages/2557741/%D0%A0%D0%B5%D0%BA%D0%BE%D0%BC%D0%B5%D0%BD%D0%B4%D0%B0%D1%86%D0%B8%D0%B8+%D0%BF%D0%BE+%D0%BD%D0%B0%D1%81%D1%82%D1%80%D0%BE%D0%B9%D0%BA%D0%B5+%D1%82%D0%BE%D0%BF%D0%B8%D0%BA%D0%B0+%D1%82%D0%B0%D1%80%D0%B8%D1%84+Base).
|
|||
|
|
|
|||
|
|
По умолчанию топик создаётся с `--partitions 10 --replication-factor 3 --config min.insync.replicas=2`, поэтому, если нет необходимости создавать топик со специфическими настройками, эти флаги в команде можно не указывать.
|
|||
|
|
|
|||
|
|
### Просмотр списка топиков
|
|||
|
|
|
|||
|
|
Выполняется команда с указанием брокеров и файла конфигурации:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-topics.sh --bootstrap-server <IP_брокера_0>:9092,<IP_брокера_1>:9092,<IP_брокера_2>:9092 --command-config ~/client.properties --list
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Параметры:**
|
|||
|
|
- **bootstrap-server** - список брокеров кластера (адреса и порты);
|
|||
|
|
- **command-config** - путь к файлу с настройками клиента;
|
|||
|
|
- **list** - вывод списка топиков.
|
|||
|
|
|
|||
|
|
### Отправка сообщения в топик
|
|||
|
|
|
|||
|
|
Выполняется команда с указанием брокеров, файла конфигурации и топика:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-console-producer.sh --bootstrap-server <IP_брокера_0>:9092,<IP_брокера_1>:9092,<IP_брокера_2>:9092 --command-config~/client.properties --topic test-topic
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
После выполнения команды сообщения вводятся построчно. Для завершения используется сочетание клавиш Ctrl+C.
|
|||
|
|
|
|||
|
|
### Чтение сообщений из топика
|
|||
|
|
|
|||
|
|
Выполняется команда с указанием топика и, при необходимости, consumer group:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-console-consumer.sh --bootstrap-server <IP_брокера_0>:9092,<IP_брокера_1>:9092,<IP_брокера_2>:9092 --command-config~/client.properties --topic test-topic --from-beginning --group my-new-consumer-group
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Параметры:**
|
|||
|
|
- **from-beginning** - чтение всех сообщений с начала (при отсутствии параметра чтение начинается с новых сообщений);
|
|||
|
|
- **group -** название consumer group, которая создастся автоматически.
|
|||
|
|
## Управление пользователями и доступом
|
|||
|
|
|
|||
|
|
### Создание пользователей
|
|||
|
|
|
|||
|
|
Для удобства можно написать скрипт, который создает пользователя сразу для двух методов шифрования - SHA-256 и SHA-512:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
#!/bin/bash
|
|||
|
|
username="имя_пользователя"
|
|||
|
|
password="сгенерированный_пароль"
|
|||
|
|
bootstrap="<IP_брокера_0>:9092,<IP_брокера_1>:9092,<IP_брокера_X>:9092"
|
|||
|
|
|
|||
|
|
/opt/kafka/bin/kafka-configs.sh \
|
|||
|
|
--bootstrap-server $bootstrap \
|
|||
|
|
--alter \
|
|||
|
|
--add-config SCRAM-SHA-256=[password="$password"] \
|
|||
|
|
--command-config /opt/kafka/config/client.properties \
|
|||
|
|
--entity-type users \
|
|||
|
|
--entity-name $username
|
|||
|
|
|
|||
|
|
/opt/kafka/bin/kafka-configs.sh \
|
|||
|
|
--bootstrap-server $bootstrap \
|
|||
|
|
--alter \
|
|||
|
|
--add-config SCRAM-SHA-512=[password="$password"] \
|
|||
|
|
--command-config /opt/kafka/config/client.properties \
|
|||
|
|
--entity-type users \
|
|||
|
|
--entity-name $username
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Запуск скрипта:**
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sudo ./kafka-user-add.sh
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Просмотр данных о пользователе
|
|||
|
|
|
|||
|
|
Для просмотра информации о существующих пользователях выполняется команда:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sudo ./kafka-console-consumer.sh --bootstrap-server <IP_брокеров>:9092 --describe --command-config ../config/client.properties --entity-type users
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Управление consumer groups
|
|||
|
|
|
|||
|
|
Consumer Group создается автоматически при обращении к ней. Например, при чтении сообщений из топика с указанием этой consumer group:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sudo ./kafka-console-consumer.sh --bootstrap-server <IP_брокеров>:9092 --command-config ../config/client.properties --topic test-topic --group test-group --from-beginning
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Просмотр списка ACL
|
|||
|
|
|
|||
|
|
Действия с ACL из командной строки осуществляются скриптом kafka-acls.sh. Пример просмотра списка ACL:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
sudo ./kafka-acls.sh --bootstrap-server <IP_брокера_0>:9092,<IP_брокера_1>:9092,<IP_брокера_X>:9092 --command-config ../config/client.properties --list
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## Администрирование кластера
|
|||
|
|
|
|||
|
|
### Изменение объема RAM
|
|||
|
|
|
|||
|
|
Для изменения объема оперативной памяти, выделенной под Kafka, направляется обращение в техническую поддержку с указанием требуемого объема RAM.
|
|||
|
|
|
|||
|
|
### Изменение количества брокеров
|
|||
|
|
|
|||
|
|
Выполняется также через обращение в техническую поддержку.
|
|||
|
|
|
|||
|
|
Брокеров в кластере рекомендуется поддерживать **нечетным** для корректной работы механизмов выборов лидера.
|
|||
|
|
|
|||
|
|
После добавления брокера выполняется перераспределение существующих партиций с учетом нового узла с помощью утилиты **kafka-reassign-partitions.sh**:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
./kafka-reassign-partitions.sh --bootstrap-server <адреса_брокеров> --generate --topics-to-move-json-file topics.json --broker-list "0,1,2" --execute
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
: : : warning
|
|||
|
|
Операция перераспределения партиций требует времени и может создавать дополнительную нагрузку на кластер. Выполнение рекомендуется в период наименьшей активности.
|
|||
|
|
: : :
|