Порядок создания коннекторов¶
Коннектор «dh-collections-service-collection-connector»
Коннектор «dh-collections-service-collection-connector» предназначен для чтения таблиц:
- dh_collections;
- dh_collection_content;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-collections-service-collection-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-collections-service-collection",
"database.dbname": " <span><ENVIRONMENT_PREFIX></span>_dh_collections_db",
"slot.name": "dh_collections_slot",
"publication.name": "dh_collections_publication",
"table.include.list": "<SCHEMA>.dh_collections,<SCHEMA>.dh_collection_content,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-collections-service-collection",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).dh_collection(.*)",
"transforms.AddPrefix.replacement": " <span><ENVIRONMENT_PREFIX></span>-dh-es-dh-collections-service-collection",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_collections_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию « <ENVIRONMENT_PREFIX>_dh_collections_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
После выполнения данных шагов создание файла коннектора «dh-collections-service-collection-connector» будет завершено.
Коннектор «dh-documents-service-documents-connector»
Коннектор «dh-documents-service-documents-connector» предназначен для чтения таблиц:
- dh_documents;
- dh_compound_relations;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-documents-service-documents-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-documents-service-document",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_documents_db",
"slot.name": "dh_documents_slot",
"publication.name": "dh_documents_publication",
"table.include.list": "<SCHEMA>.dh_documents,<SCHEMA>.dh_compound_relations,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-documents-service-document",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"snapshot.mode": "always",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*)(.public.dh_documents|.public.dh_compound_relations)",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-documents-service-document",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-documents-service-document-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_documents_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_documents_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
Коннектор «dh-documents-service-documents-connector» содержит параметр «snapshot.mode» со значением «always». Необходимо сохранить данное значение, если требуется выполнить начальный снимок таблиц.
После выполнения данных шагов создание файла коннектора «dh-documents-service-documents-connector» будет завершено.
Коннектор «dh-documents-service-items-connector»
Коннектор «dh-documents-service-items-connector» предназначен для чтения таблиц:
- dh_items;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-documents-service-items-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-documents-service-items",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_documents_db",
"slot.name": "dh_items_slot",
"publication.name": "dh_items_publication",
"table.include.list": "<SCHEMA>.dh_items,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-documents-service-items",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"snapshot.mode": "always",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).public.dh_items",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-documents-service-item",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-documents-service-items-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_documents_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_documents_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
Коннектор «dh-documents-service-items-connector» содержит параметр «snapshot.mode» со значением «always». Необходимо сохранить данное значение, если требуется выполнить начальный снимок таблиц.
После выполнения данных шагов создание файла коннектора «dh-documents-service-items-connector» будет завершено.
Коннектор «dh-favorites-service-dhfavorite-connector»
Коннектор «dh-favorites-service-dhfavorite-connector» предназначен для чтения таблиц:
- dh_favorites;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-favorites-service-dhfavorite-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-favorites-service-dhfavorite",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_favorites_db",
"slot.name": "dh_favorites_slot",
"publication.name": "dh_favorites_publication",
"table.include.list": "<SCHEMA>.dh_favorites,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-favorites-service-dhfavorite",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).dh_favorites",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-favorites-service-dhfavorite",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-favorites-service-dhfavorite-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_favorites_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_favorites_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
После выполнения данных шагов создание файла коннектора «dh-favorites-service-dhfavorite-connector» будет завершено.
Коннектор «dh-folders-service-folder-connector»
Коннектор «dh-folders-service-folder-connector» предназначен для чтения таблиц:
- dh_folders;
- dh_folder_content;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-folders-service-folder-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-folders-service-folder",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_folders_db",
"slot.name": "dh_folders_slot",
"publication.name": "dh_folders_publication",
"table.include.list": "<SCHEMA>.dh_folders,<SCHEMA>.dh_folder_content,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-folders-service-folder",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.).public.dh_folder(.)",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-folders-service-folder",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-folders-service-folder-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_folders_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_folders_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
После выполнения данных шагов создание файла коннектора «dh-folders-service-folder-connector» будет завершено.
Коннектор «dh-relations-service-relation-connector»
Коннектор «dh-relations-service-relation-connector» предназначен для чтения таблиц:
- dh_relations;
- debezium_heartbeat.
Для создания коннектора необходимо выполнить следующие действия:
1. Создать JSON-файл с помощью команды:
nano dh-relations-service-relation-connector.json
2. Заполнить созданный файл с помощью шаблона:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-relations-service-relation",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_relations_db",
"slot.name": "dh_relations_slot",
"publication.name": "dh_relations_publication",
"table.include.list": "<SCHEMA>.dh_relations,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-relations-service-relation",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).public.dh_relations",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-relations-service-relation",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-relations-service-relation-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}
3. Изменить в заполненном с помощью шаблона файле:
- <адрес db> в «database.hostname» – на адрес PostgreSQL;
- <порт PostgreSQL> – на фактический порт PostgreSQL;
- <логин пользователя db> – на логин пользователя PostgreSQL;
- <пароль пользователя db> – на пароль пользователя PostgreSQL;
- <ENVIRONMENT_PREFIX>_dh_relations_db – на фактическое наименование БД, если используется не значение по умолчанию;
- <SCHEMA> – на фактическое имя схемы;
- <адрес kafka:port listener> – на адрес Apache Kafka;
- <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.
Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_relations_db».
4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.
5. При необходимости изменить поля:
- «slot.name» – если слот с указанным именем уже используется другим коннектором;
- «publication.name» – если публикация с таким именем уже существует;
- «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.
После выполнения данных шагов создание файла коннектора «dh-relations-service-relation-connector» будет завершено.