Асинхронне завантаження даних

🌐 Цей документ доступний українською та англійською мовами. Використовуйте перемикач у правому верхньому куті, щоб змінити версію.

1. Загальний опис

В поточній імплементації існує механізм який дозволяє завантажити csv файл з даним які будуть завантажені до відповідної таблиці. Цей процес відбувається синхронно, тому відповідь клієнт має отримати за 30 секунд (максимальний допустимий таймаут). Окрім того оскільки комунікація між сервісами синхронного та асинхронного управління даними реєстру відбувається через брокер повідомлень, існує ліміт в один мегабайт для повідомлень. Тому було встановлено штучний ліміт в 50 рядків. В процесі експлуатації виникла необхідність завантажувати більші об’єми даних, за розміром завантаження яких може займати значно довший час.

2. Функціональні сценарії

  • Збереження даних з файлу який більше одного мегабайта і збереження якого може тривати довше ніж 30 секунд.

  • Збереження даних з файлу у декілька таблиць.

3. Ролі користувачів

  • Розробник регламенту

  • Надавач послуг

4. Загальні принципи та положення

  • Збереження даних з файлу відбуваються в одній транзакції.

  • Опрацювання файлу відбувається по рядках.

  • Бізнес процес будується на подіях які виникають при відправці повідомлення до брокера.

  • Події завантаження в бізнес процес мають обробляться головним процесом і не можуть бути проасоційовані із саб-процесом.

  • Операція збереження відбувається асинхронно.

  • Прогрес завантаження не відслідковується.

  • В якості посилання на похідний документ для всіх сутностей буде один ключ на весь файл без вказання на конкретний рядок.

  • Фали що не відповідають правилам встановленим моделювальникам не можуть бути завантажені.

  • Обмеження по розміру файлу керується на рівні control plane

6. Моделювання регламенту реєстру

6.1. Розширення для моделювання

Для реалізації можливості асинхронного завантаження сутностей до БД, конфігурація складається з декілька частин:

Конфігурація на рівні моделі даних за допомогою розширення liquibase, моделювання форми по завантаженню файлів та використання делегату асинхронної взаємодії при моделюванні БП.

Розширення бібліотеки liquibase
<changeSet>
    <createTable name="item">
        <!-- Опис полів таблиці !-->
    </createTable>
    <createTable name="demo_entity">
        <!-- Опис полів таблиці !-->
    </createTable>

    <createCompositeEntity name="item_with_references">
        <!-- Опис полів складної сутності !-->
    </createCompositeEntity>

    <createAsyncLoad name="allowedAsyncLoads">
        <entityList>
            <entity name="item" limit="100"/>
            <entity name="item_with_references" limit="1000"/>
            <entity name="demo_entity" limit="1000000"/>
        </entityList>
    </createAsyncLoad>

    <deleteAsyncLoad name="removeEntities">
        <entityList>
            <entity name="demo_entity"/>
        </entityList>
    </deleteAsyncLoad>

</changeSet>

Атрибут limit є обовʼязковим при створенні createAsyncLoad

business process

Приклад 1. Конфігурація делегату для асинхронного завантаження

delegateConfiguration

В результаті обробки, можливе виникнення декількох подій, в залежності від статусу результату. Тип події складається з назви сутності та статусу.

Приклад 2. Приклади налаштування обробки подій успішного завантаження сутності item

succesEvent

Приклад 3. Приклади налаштування обробки подій для при збереженні сутності item

constraintViolation

Загальне правило для формування подій при асинхронній взаємодії формується за допомогою camel case і складається з назви сутності над якою здійснюється операція + назва операція + результат операції

Таблиця 1. Можливі статуси результату опрацювання
Результат операції Опис Приклад події на бізнес процесі

SUCCESS

Операція закінчилась успішно.

%item%DataLoadCsvSuccess

CONSTRAINT_VIOLATION

Дані з файлу не можуть бути завантаженні оскільки один з них порушує існуючі правила БД.

%item%DataLoadCsvConstraintViolation

OPERATION_FAILED

Під час опрацювання файлу виникла помилка.

%item%DataLoadCsvOperationFailed

7. Низькорівневий дизайн сервісів

7.1. Бібліотека Liquibase-розширень для моделювання дата моделі реєстру

Результатом обробки тегів createAsyncLoad deleteAsyncLoad є формування переліку структур для яких дозволено асинхронне завантаження даних з файлів в таблиці метаданих.

7.2. Делегат для відправки асинхронних повідомлень

При відправці повідомлення за допомогою делегата, разом з тілом повідомлення відправляються службові заголовки для трасування.

Поля делегата які заповнюються при моделюванні.

Назва сутності - назва обʼєкту дата моделі (таблиця або складний обʼєкт)
Файл - структура яка представляє файл і складається з ключа до файлу і чексуми.
Підпис - структура яка представляє форму яка була підписана з вмістом файлу.
Похідний файл (опційно) - структура яка представляє файл, який був створений в бізнес процесі або в результаті опрацювання оригінального файлу.
Змінна - Назва змінної в яку буде збережено результат обробки файлу.
JWT токен - токен користувача.

Приклад тіла повідомлення для збереження даних з файлу
{
  "payload": {
    "file": {
      "checksum": "....",
      "id": "process/bp-instance-id/uuid"
    },
    "derivedFile": {
      "checksum": "...",
      "id": "process/bp-instance-id/uuid"
    }
  }
}

Всі метадані до повідомлення передаються в заголовках до повідомлення разом з типовими для БП "X-Digital-*" заголовками.

X-Digital-Signature - користувацький підпис.
X-Digital-Signature-Derived - підпис який генерується на підставі фінального повідомлення.
EntityName - назва обʼєкту дата моделі.
ResultVariable - назва персистеної змінної в яку буде збережено результат обробки файлу.

7.3. Сервіс синхронного управління даними реєстру

Валідація відбувається згідно існуючого процесу за рахунок проксювання запитів до сервісу синхронного управління даними, правила щодо дозволеної кількості сутностей виставлених моделювальником формується на етапі генерації сервісу.

7.4. Сервіс асинхронного управління даними реєстру

Процес обробки повідомлення здійснюється існуючими обробниками для збереження сутностей (createEntity, createCompositeEntity) який обирається динамічно по тупи сутності в залежності від значення поля entityName, формування переліку маршрутизації entityName до обробника відбувається на етапі генерації.

Результатом обробки буде статус та деталі до повідомлення.

{
  "status": "SUCCESS",
  "details": "OK"
}
{
  "status": "CONSTRAINT_VIOLATION",
  "details": "error: {%s} in line: {%d}"
}

Текст з помилки про порушення правил БД, береться з процедури, а номер рядка за рахунок ведення лічильника в середині транзакції.

7.5. Обробник повідомлень подій результатів завантаження даних для сервісу виконання бізнес-процесів

Кореляція результату з бізнес процесом відбувається за рахунок BusinessProcessInstanceId з контексту. А тип повідомлення формується динамічно на підставі типу сутності та результату.

Приклад можливої кореляції
@Component
public class AsyncDataLoadResponseKafkaListener {
    private static final String ACTION = "DataLoadCsv";
    @Autowired
    private RuntimeService runtimeService;

    @KafkaListener("data-load.csv.outbound")
    public void processAsyncMessages(
            @Payload AsyncDataLoadResponse message,
            MessageHeaders headers) {
        AsyncDataLoadResult payload = message.geyPayload();

        RequestContext requestContext = message.getRequestContext();
        Result result = new Result(message.getStatus(), message.getDetails());
        runtimeService.createMessageCorrelation(payload.getEntityName() + ACTION + message.getStatus())
          .processInstanceId(requestContext.getProcessInstanceId())
          .setVariable(payload.getResultVariable(), result)
          .correlate();
    }

}

8. Високорівневий план розробки

8.2. План розробки

  • Створення нової форми для завантаження даних з CSV файлів

  • Розширення бібліотека Liquibase додатковими тегами.

  • Розробка нового делегату для відправки асинхронних повідомлень.

  • Розширення сервісу асинхронного управління даними реєстру для роботи з повідомленнями про завантаження даних.

  • Розширення сервісу виконання бізнес-процесів компонентою для обробки вхідних повідомлень.

  • Розробка реферетного прикладу БП.

  • Зміна існуючої форми в частині необхідності вказання сутності для валідації (поле стає не обовʼязковим і валідація здійснюється тільки при наявності значення в цьому полі)

  • Розширення можливості збереження файлів CSV як файлів в сервісах управління даними реєстру