Создание коннектора

После настройки PostgreSQL и Apache Kafka следует приступить к созданию коннектора. Для этого необходимо выполнить следующие действия:

1. Перед созданием любого коннектора определить следующие параметры:

  • адрес PostgreSQL;
  • порт PostgreSQL;
  • логин пользователя PostgreSQL;
  • пароль пользователя PostgreSQL;
  • имя базы данных;
  • Kafka bootstrap servers;
  • префикс окружения <ENVIRONMENT_PREFIX>;
  • схему PostgreSQL, если это не public;
  • тип аутентификации Apache Kafka: «NONE», «PLAIN» или «OAUTH2»;
  • используется ли SSL для Apache Kafka;
  • используется ли truststore;
  • используется ли keystore.

2. Создать JSON-файл коннектора. Пример команды для «dh-collections-service» сервиса:

nano dh-collections-service-collection-connector.json

3. Заполнить созданный файл шаблоном нужного коннектора. Шаблоны приведены далее в формате чистого объекта конфигурации, без указания полей «name» и «config».

4. Заполнить данные для подключения к PostgreSQL:

  • database.hostname;
  • database.port;
  • database.user;
  • database.password;
  • database.dbname.

5. Заполнить данные для подключения к Apache Kafka. Необходимо добавить файл коннектора и заполнить параметры:

  • schema.history.internal.kafka.bootstrap.servers;
  • schema.history.internal.kafka.topic;
  • schema.history.internal.producer.security.protocol;
  • schema.history.internal.consumer.security.protocol.

При необходимости добавить поля:

  • «SSL»;
  • «SASL/PLAIN»;
  • «SASL/OAUTHBEARER»;
  • «truststore» – при использовании Keycloak по SSL.

6. Заполнить данные конкретного коннектора:

  • table.include.list;
  • heartbeat.action.query;
  • transforms.AddPrefix.replacement.

7. Отправить конфигурацию в Kafka Connect с помощью команды:

curl -X PUT http://CONNECT_HOST:8083/connectors/<CONNECTOR_NAME>/config
-H 'Content-Type: application/json'
-d @<CONNECTOR_FILE>.json

8. Проверить статус работы коннектора в Kafka Connect с помощью команды:

curl http://CONNECT_HOST:8083/connectors/<CONNECTOR_NAME>/status

9. Указать общие параметры коннекторов.

Параметры для подключения к PostgreSQL:

  • database.hostname – адрес PostgreSQL;
  • database.port – порт PostgreSQL;
  • database.user – логин пользователя PostgreSQL;
  • database.password – пароль пользователя PostgreSQL;
  • database.dbname – имя базы данных.

В случае, когда несколько коннекторов используют одну базу данных, параметр «database.dbname» для них совпадает.

Параметры для Debezium/PostgreSQL:

  • «connector.class» – указать «io.debezium.connector.postgresql.PostgresConnector»;
  • «plugin.name» – указать «pgoutput»;
  • «tasks.max» – указать «1»;
  • «database.server.name» – логическое имя источника Debezium;
  • «slot.name» – наименование слота репликации, должно быть уникальным для каждой публикации;
  • «publication.name» – наименование публикации, должно быть уникальным для каждого коннектора;
  • «table.include.list» – список таблиц, читаемых коннектором;
  • «topic.prefix» – логический префикс Debezium;
  • «publication.autocreate.mode» – «filtered»;
  • «snapshot.mode» – указывается только для коннекторов, требующих initial snapshot.

Параметры для Heartbeat:

  • «heartbeat.interval.ms» – интервал heartbeat;
  • «heartbeat.action.query» – SQL для heartbeat.

Если таблицы расположены не в схеме «public», необходимо указать актуальное имя схемы в параметрах «table.include.list» и «heartbeat.action.query».

Параметры для Topic transform:

  • transforms – указать значение «AddPrefix»;
  • transforms.AddPrefix.type – указать значение «org.apache.kafka.connect.transforms.RegexRouter»;
  • transforms.AddPrefix.regex – указать регулярное выражение для замены наименования топика;
  • transforms.AddPrefix.replacement – указать итоговое наименование CDC-топиков.

10. Добавить в JSON-файл коннектора раздел истории Apache Kafka и аутентификации Apache Kafka.

В конфигурацию каждого коннектора необходимо включить параметры доступа к Apache Kafka, через которую Debezium осуществляет чтение и запись схемы истории топика.

Необходимо указать обязательные поля при любом способе аутентификации:

"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "<уникальная schema history topic>",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL>"

Описание параметров:

  • «schema.history.internal.kafka.bootstrap.servers» – адрес Kafka broker listener в формате «host:port»;
  • «schema.history.internal.kafka.topic» – отдельный топик истории схем для данного коннектора;
  • «schema.history.internal.producer.security.protocol» – тип подключения producer к Apache Kafka;
  • «schema.history.internal.consumer.security.protocol» – тип подключения consumer к Apache Kafka.

Выбор протокола безопасности необходимо осуществлять в зависимости от наличия шифрования и аутентификации Apache Kafka:

  • Для Apache Kafka без шифрования и без аутентификации – PLAINTEXT;
  • Для Apache Kafka с шифрованием, но без аутентификации – SSL;
  • Для Apache Kafka без шифрования, но с аутентификацией – SASL_PLAINTEXT;
  • Для Apache Kafka с шифрованием и с аутентификацией – SASL_SSL.

Рекомендуется указывать одинаковые значения для параметров «producer.security.protocol» и «consumer.security.protocol».

Варианты типовых конфигураций:

  • Для Apache Kafka без аутентификации и без SSL. Необходимо использовать, если Kafka listener принимает обычное незащищенное подключение без логина и без TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "PLAINTEXT"
  • Для Apache Kafka без аутентификации, с SSL. Необходимо использовать, если Kafka listener требует TLS, но не требует логин/пароль или OAuth:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SSL",
"schema.history.internal.consumer.security.protocol": "SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3"
  • При использовании truststore. Если сертификат Apache Kafka осуществляет проверку через truststore, необходимо добавить поля:
"schema.history.internal.producer.ssl.truststore.location": "/opt/kafka/certs/<truststore file>",
"schema.history.internal.producer.ssl.truststore.password": "<пароль truststore>",
"schema.history.internal.producer.ssl.truststore.type": "JKS",
"schema.history.internal.consumer.ssl.truststore.location": "/opt/kafka/certs/<truststore file>",
"schema.history.internal.consumer.ssl.truststore.password": "<пароль truststore>",
"schema.history.internal.consumer.ssl.truststore.type": "JKS"
  • При использовании keystore. Если Apache Kafka требует клиентский сертификат, необходимо добавить поля:
"schema.history.internal.producer.ssl.keystore.location": "/opt/kafka/certs/<keystore file>",
"schema.history.internal.producer.ssl.keystore.password": "<пароль keystore>",
"schema.history.internal.producer.ssl.keystore.type": "JKS",
"schema.history.internal.producer.ssl.key.password": "<пароль ключа>",
"schema.history.internal.consumer.ssl.keystore.location": "/opt/kafka/certs/<keystore file>",
"schema.history.internal.consumer.ssl.keystore.password": "<пароль keystore>",
"schema.history.internal.consumer.ssl.keystore.type": "JKS",
"schema.history.internal.consumer.ssl.key.password": "<пароль ключа>"
  • Для Apache Kafka с PLAIN без SSL. Необходимо использовать, если Kafka listener требует логин/пароль по SASL/PLAIN, но не использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";"
  • Для Apache Kafka с PLAIN и SSL. Необходимо использовать, если Kafka listener требует логин/пароль по SASL/PLAIN и одновременно использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD>";"

Если Apache Kafka требует truststore или keystore, необходимо добавить соответствующие поля из примера конфигурации для Apache Kafka без аутентификации, с SSL.

  • Для Apache Kafka с OAuth 2 без SSL. Необходимо использовать, если Kafka listener требует OAUTHBEARER, но не использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.producer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.producer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>",
"schema.history.internal.consumer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.consumer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.consumer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>"
  • Для Apache Kafka с OAuth 2 и SSL. Необходимо использовать, если Kafka listener требует OAUTHBEARER и одновременно использует TLS:
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.consumer.ssl.enabled.protocols": "TLSv1.2,TLSv1.3",
"schema.history.internal.producer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.producer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.producer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.producer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>",
"schema.history.internal.consumer.sasl.mechanism": "OAUTHBEARER",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;",
"schema.history.internal.consumer.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler",
"schema.history.internal.consumer.sasl.oauthbearer.token.endpoint.url": "https://<адрес keycloak>/realms/<realm>/protocol/openid-connect/token",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.id": "<KAFKA_KEYCLOAK_CLIENT_ID>",
"schema.history.internal.consumer.sasl.oauthbearer.client.credentials.client.secret": "<KAFKA_KEYCLOAK_CLIENT_SECRET>"

Если Apache Kafka требует truststore или keystore, необходимо добавить соответствующие поля из примера конфигурации для Kafka без аутентификации, с SSL.

После выполнения данных шагов коннектор будет создан и настроен.