Порядок создания коннекторов

Коннектор «dh-collections-service-collection-connector»

Коннектор «dh-collections-service-collection-connector» предназначен для чтения таблиц:

  • dh_collections;
  • dh_collection_content;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-collections-service-collection-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "plugin.name": "pgoutput",
  "tasks.max": "1",
  "database.hostname": "<адрес PostgreSQL>",
  "database.port": "<порт PostgreSQL>",
  "database.user": "<логин пользователя db>",
  "database.password": "<пароль пользователя db>",
  "database.server.name": "dh-collections-service-collection",
  "database.dbname": " <span>&lt;ENVIRONMENT_PREFIX&gt;</span>_dh_collections_db",
  "slot.name": "dh_collections_slot",
  "publication.name": "dh_collections_publication",
  "table.include.list": "<SCHEMA>.dh_collections,<SCHEMA>.dh_collection_content,<SCHEMA>.debezium_heartbeat",
  "topic.prefix": "dh-collections-service-collection",
  "publication.autocreate.mode": "filtered",
  "heartbeat.interval.ms": "300000",
  "heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
  "transforms": "AddPrefix",
  "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.AddPrefix.regex": "(.*).dh_collection(.*)",
  "transforms.AddPrefix.replacement": " <span>&lt;ENVIRONMENT_PREFIX&gt;</span>-dh-es-dh-collections-service-collection",
  "schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
  "schema.history.internal.kafka.topic": "dh-collections-service-collection-schema-history",
  "schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
  "schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_collections_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию « <ENVIRONMENT_PREFIX>_dh_collections_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

После выполнения данных шагов создание файла коннектора «dh-collections-service-collection-connector» будет завершено.

Коннектор «dh-documents-service-documents-connector»

Коннектор «dh-documents-service-documents-connector» предназначен для чтения таблиц:

  • dh_documents;
  • dh_compound_relations;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-documents-service-documents-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-documents-service-document",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_documents_db",
"slot.name": "dh_documents_slot",
"publication.name": "dh_documents_publication",
"table.include.list": "<SCHEMA>.dh_documents,<SCHEMA>.dh_compound_relations,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-documents-service-document",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"snapshot.mode": "always",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*)(.public.dh_documents|.public.dh_compound_relations)",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-documents-service-document",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-documents-service-document-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_documents_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_documents_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

Коннектор «dh-documents-service-documents-connector» содержит параметр «snapshot.mode» со значением «always». Необходимо сохранить данное значение, если требуется выполнить начальный снимок таблиц.

После выполнения данных шагов создание файла коннектора «dh-documents-service-documents-connector» будет завершено.

Коннектор «dh-documents-service-items-connector»

Коннектор «dh-documents-service-items-connector» предназначен для чтения таблиц:

  • dh_items;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-documents-service-items-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-documents-service-items",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_documents_db",
"slot.name": "dh_items_slot",
"publication.name": "dh_items_publication",
"table.include.list": "<SCHEMA>.dh_items,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-documents-service-items",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"snapshot.mode": "always",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).public.dh_items",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-documents-service-item",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-documents-service-items-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_documents_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_documents_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

Коннектор «dh-documents-service-items-connector» содержит параметр «snapshot.mode» со значением «always». Необходимо сохранить данное значение, если требуется выполнить начальный снимок таблиц.

После выполнения данных шагов создание файла коннектора «dh-documents-service-items-connector» будет завершено.

Коннектор «dh-favorites-service-dhfavorite-connector»

Коннектор «dh-favorites-service-dhfavorite-connector» предназначен для чтения таблиц:

  • dh_favorites;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-favorites-service-dhfavorite-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-favorites-service-dhfavorite",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_favorites_db",
"slot.name": "dh_favorites_slot",
"publication.name": "dh_favorites_publication",
"table.include.list": "<SCHEMA>.dh_favorites,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-favorites-service-dhfavorite",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).dh_favorites",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-favorites-service-dhfavorite",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-favorites-service-dhfavorite-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_favorites_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_favorites_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

После выполнения данных шагов создание файла коннектора «dh-favorites-service-dhfavorite-connector» будет завершено.

Коннектор «dh-folders-service-folder-connector»

Коннектор «dh-folders-service-folder-connector» предназначен для чтения таблиц:

  • dh_folders;
  • dh_folder_content;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-folders-service-folder-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-folders-service-folder",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_folders_db",
"slot.name": "dh_folders_slot",
"publication.name": "dh_folders_publication",
"table.include.list": "<SCHEMA>.dh_folders,<SCHEMA>.dh_folder_content,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-folders-service-folder",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.).public.dh_folder(.)",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-folders-service-folder",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-folders-service-folder-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_folders_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_folders_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

После выполнения данных шагов создание файла коннектора «dh-folders-service-folder-connector» будет завершено.

Коннектор «dh-relations-service-relation-connector»

Коннектор «dh-relations-service-relation-connector» предназначен для чтения таблиц:

  • dh_relations;
  • debezium_heartbeat.

Для создания коннектора необходимо выполнить следующие действия:

1. Создать JSON-файл с помощью команды:

nano dh-relations-service-relation-connector.json

2. Заполнить созданный файл с помощью шаблона:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "<адрес PostgreSQL>",
"database.port": "<порт PostgreSQL>",
"database.user": "<логин пользователя db>",
"database.password": "<пароль пользователя db>",
"database.server.name": "dh-relations-service-relation",
"database.dbname": "<span><ENVIRONMENT_PREFIX></span>_dh_relations_db",
"slot.name": "dh_relations_slot",
"publication.name": "dh_relations_publication",
"table.include.list": "<SCHEMA>.dh_relations,<SCHEMA>.debezium_heartbeat",
"topic.prefix": "dh-relations-service-relation",
"publication.autocreate.mode": "filtered",
"heartbeat.interval.ms": "300000",
"heartbeat.action.query": "INSERT INTO <SCHEMA>.debezium_heartbeat (id, heartbeat_ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_ts=EXCLUDED.heartbeat_ts;",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "(.*).public.dh_relations",
"transforms.AddPrefix.replacement": "<span><ENVIRONMENT_PREFIX></span>-dh-es-dh-relations-service-relation",
"schema.history.internal.kafka.bootstrap.servers": "<адрес kafka:port listener>",
"schema.history.internal.kafka.topic": "dh-relations-service-relation-schema-history",
"schema.history.internal.producer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>",
"schema.history.internal.consumer.security.protocol": "<PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных>"
}

3. Изменить в заполненном с помощью шаблона файле:

  • <адрес db> в «database.hostname» – на адрес PostgreSQL;
  • <порт PostgreSQL> – на фактический порт PostgreSQL;
  • <логин пользователя db> – на логин пользователя PostgreSQL;
  • <пароль пользователя db> – на пароль пользователя PostgreSQL;
  • <ENVIRONMENT_PREFIX>_dh_relations_db – на фактическое наименование БД, если используется не значение по умолчанию;
  • <SCHEMA> – на фактическое имя схемы;
  • <адрес kafka:port listener> – на адрес Apache Kafka;
  • <PLAINTEXT|SSL|SASL_PLAINTEXT|SASL_SSL - один из перечисленных> – изменить тип аутентификации Apache Kafka на фактический протокол Kafka schema history.

Если для коннектора не используется отдельное наименование БД, необходимо указать значение по умолчанию «<ENVIRONMENT_PREFIX>_dh_relations_db».

4. Добавить блок Kafka schema history и аутентификации, добавление данного блока описано в разделе Порядок создания коннектора.

5. При необходимости изменить поля:

  • «slot.name» – если слот с указанным именем уже используется другим коннектором;
  • «publication.name» – если публикация с таким именем уже существует;
  • «schema.history.internal.kafka.topic» – если топик истории схемы уже используется.

После выполнения данных шагов создание файла коннектора «dh-relations-service-relation-connector» будет завершено.