Создание коннектора¶
После настройки PostgreSQL и Apache Kafka следует приступить к созданию коннектора. Для этого необходимо выполнить следующие действия:
1. Перед созданием любого коннектора определить следующие параметры:
- адрес PostgreSQL;
- порт PostgreSQL;
- логин пользователя PostgreSQL;
- пароль пользователя PostgreSQL;
- имя базы данных;
- Kafka bootstrap servers;
- префикс окружения <ENVIRONMENT_PREFIX>;
- схему PostgreSQL, если это не public;
- тип аутентификации Apache Kafka: «NONE», «PLAIN» или «OAUTH2»;
- используется ли SSL для Apache Kafka;
- используется ли truststore;
- используется ли keystore.
2. Создать JSON-файл коннектора. Пример команды для «dh-collections-service» сервиса:
nano dh-collections-service-collection-connector.json
3. Заполнить созданный файл шаблоном нужного коннектора. Шаблоны приведены далее в формате чистого объекта конфигурации, без указания полей «name» и «config».
4. Заполнить данные для подключения к PostgreSQL:
- database.hostname;
- database.port;
- database.user;
- database.password;
- database.dbname.
5. Заполнить данные для подключения к Apache Kafka. Необходимо добавить файл коннектора и заполнить параметры:
- schema.history.internal.kafka.bootstrap.servers;
- schema.history.internal.kafka.topic;
- schema.history.internal.producer.security.protocol;
- schema.history.internal.consumer.security.protocol.
При необходимости добавить поля:
- «SSL»;
- «SASL/PLAIN»;
- «SASL/OAUTHBEARER»;
- «truststore» – при использовании Keycloak по SSL.
6. Заполнить данные конкретного коннектора:
- table.include.list;
- heartbeat.action.query;
- transforms.AddPrefix.replacement.
7. Отправить конфигурацию в Kafka Connect с помощью команды:
curl -X PUT http://CONNECT_HOST:8083/connectors/<CONNECTOR_NAME>/config
-H 'Content-Type: application/json'
-d @<CONNECTOR_FILE>.json
8. Проверить статус работы коннектора в Kafka Connect с помощью команды:
curl http://CONNECT_HOST:8083/connectors/<CONNECTOR_NAME>/status
9. Указать общие параметры коннекторов.
Параметры для подключения к PostgreSQL:
- database.hostname – адрес PostgreSQL;
- database.port – порт PostgreSQL;
- database.user – логин пользователя PostgreSQL;
- database.password – пароль пользователя PostgreSQL;
- database.dbname – имя базы данных.
В случае, когда несколько коннекторов используют одну базу данных, параметр «database.dbname» для них совпадает.
Параметры для Debezium/PostgreSQL:
- «connector.class» – указать «io.debezium.connector.postgresql.PostgresConnector»;
- «plugin.name» – указать «pgoutput»;
- «tasks.max» – указать «1»;
- «database.server.name» – логическое имя источника Debezium;
- «slot.name» – наименование слота репликации, должно быть уникальным для каждой публикации;
- «publication.name» – наименование публикации, должно быть уникальным для каждого коннектора;
- «table.include.list» – список таблиц, читаемых коннектором;
- «topic.prefix» – логический префикс Debezium;
- «publication.autocreate.mode» – «filtered»;
- «snapshot.mode» – указывается только для коннекторов, требующих initial snapshot.
Параметры для Heartbeat:
- «heartbeat.interval.ms» – интервал heartbeat;
- «heartbeat.action.query» – SQL для heartbeat.
Если таблицы расположены не в схеме «public», необходимо указать актуальное имя схемы в параметрах «table.include.list» и «heartbeat.action.query».
Параметры для Topic transform:
- transforms – указать значение «AddPrefix»;
- transforms.AddPrefix.type – указать значение «org.apache.kafka.connect.transforms.RegexRouter»;
- transforms.AddPrefix.regex – указать регулярное выражение для замены наименования топика;
- transforms.AddPrefix.replacement – указать итоговое наименование CDC-топиков.
10. Добавить в JSON-файл коннектора раздел истории Apache Kafka и аутентификации Apache Kafka.
В конфигурацию каждого коннектора необходимо включить параметры доступа к Apache Kafka, через которую Debezium осуществляет чтение и запись схемы истории топика.
Необходимо указать обязательные поля при любом способе аутентификации:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "<уникальная schema history topic>",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL>"
Описание параметров:
- «schema.history.internal.kafka.bootstrap.servers» – адрес Kafka broker listener в формате «host:port»;
- «schema.history.internal.kafka.topic» – отдельный топик истории схем для данного коннектора;
- «schema.history.internal.producer.security.protocol» – тип подключения producer к Apache Kafka;
- «schema.history.internal.consumer.security.protocol» – тип подключения consumer к Apache Kafka.
Выбор протокола безопасности необходимо осуществлять в зависимости от наличия шифрования и аутентификации Apache Kafka:
- Для Apache Kafka без шифрования и без аутентификации – PLAINTEXT;
- Для Apache Kafka с шифрованием, но без аутентификации – SSL;
- Для Apache Kafka без шифрования, но с аутентификацией – SASL_PLAINTEXT;
- Для Apache Kafka с шифрованием и с аутентификацией – SASL_SSL.
Рекомендуется указывать одинаковые значения для параметров «producer.security.protocol» и «consumer.security.protocol».
Варианты типовых конфигураций:
- Для Apache Kafka без аутентификации и без SSL. Необходимо использовать, если Kafka listener принимает обычное незащищенное подключение без логина и без TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "PLAINTEXT"
- Для Apache Kafka без аутентификации, с SSL. Необходимо использовать, если Kafka listener требует TLS, но не требует логин/пароль или OAuth:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SSL",
"schema.history.internal.consumer.security.protocol": "SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3"
- При использовании truststore. Если сертификат Apache Kafka осуществляет проверку через truststore, необходимо добавить поля:
"schema.history.internal.producer.ssl.truststore.location": "/opt/kafka/certs/<truststore file>",
"schema.history.internal.producer.ssl.truststore.password": "<пароль truststore>",
"schema.history.internal.producer.ssl.truststore.type": "JKS",
"schema.history.internal.consumer.ssl.truststore.location": "/opt/kafka/certs/<truststore file>",
"schema.history.internal.consumer.ssl.truststore.password": "<пароль truststore>",
"schema.history.internal.consumer.ssl.truststore.type": "JKS"
- При использовании keystore. Если Apache Kafka требует клиентский сертификат, необходимо добавить поля:
"schema.history.internal.producer.ssl.keystore.location": "/opt/kafka/certs/<keystore file>",
"schema.history.internal.producer.ssl.keystore.password": "<пароль keystore>",
"schema.history.internal.producer.ssl.keystore.type": "JKS",
"schema.history.internal.producer.ssl.key.password": "<пароль ключа>",
"schema.history.internal.consumer.ssl.keystore.location": "/opt/kafka/certs/<keystore file>",
"schema.history.internal.consumer.ssl.keystore.password": "<пароль keystore>",
"schema.history.internal.consumer.ssl.keystore.type": "JKS",
"schema.history.internal.consumer.ssl.key.password": "<пароль ключа>"
- Для Apache Kafka с PLAIN без SSL. Необходимо использовать, если Kafka listener требует логин/пароль по SASL/PLAIN, но не использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";"
- Для Apache Kafka с PLAIN и SSL. Необходимо использовать, если Kafka listener требует логин/пароль по SASL/PLAIN и одновременно использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";"
Если Apache Kafka требует truststore или keystore, необходимо добавить соответствующие поля из примера конфигурации для Apache Kafka без аутентификации, с SSL.
- Для Apache Kafka с OAuth 2 без SSL. Необходимо использовать, если Kafka listener требует OAUTHBEARER, но не использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.producer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.producer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>",
"schema.history.internal.consumer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.consumer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.consumer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>"
- Для Apache Kafka с OAuth 2 и SSL. Необходимо использовать, если Kafka listener требует OAUTHBEARER и одновременно использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.producer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.producer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.producer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>",
"schema.history.internal.consumer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.consumer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.consumer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>"
Если Apache Kafka требует truststore или keystore, необходимо добавить соответствующие поля из примера конфигурации для Kafka без аутентификации, с SSL.
После выполнения данных шагов коннектор будет создан и настроен.