Asynchronous data loading

🌐 This document is available in both English and Ukrainian. Use the language toggle in the top right corner to switch between versions.

1. General description

In the current implementation, there is a mechanism that allows you to download a csv file with data that will be uploaded to the corresponding table. This process occurs synchronously, so the client should receive a response within 30 seconds (the maximum allowable timeout). In addition, since the communication between the synchronous and asynchronous registry data management services takes place through a message broker, there is a limit of one megabyte for messages. Therefore, an artificial limit of 50 lines was set. In the process of operation, it became necessary to download larger volumes of data, the size of which can take a much longer time to download.

2. Functional scenarios

  • Saving data from a file that is larger than one megabyte and may take longer than 30 seconds to save.

  • Saving data from a file to several tables.

3. User roles

  • Developer of regulations

  • Service provider

4. General provisions

  • Saving data from a file takes place in one transaction.

  • The file is processed line by line.

  • The business process is built on events that occur when a message is sent to the broker.

  • Events loaded into a business process must be processed by the main process and cannot be associated with a sub-process.

  • The save operation is asynchronous.

  • Download progress is not tracked.

  • As a reference to the derived document for all entities, there will be one key for the entire file without specifying a specific line.

  • Files that do not comply with the rules of the installed simulators cannot be loaded.

  • The file size limit is managed at the control plane level

6. Simulation of the registry regulations

6.1. Simulation extension

To implement the possibility of asynchronous loading of entities to the database, the configuration consists of several parts:

Configuration at the level of the data model using the liquibase extension, modeling the file upload form, and using the asynchronous interaction delegate for BP modeling.

An extension of the liquibase library
<changeSet>
    <createTable name="item">
        <!-- Description of table fields !-->
    </createTable>
    <createTable name="demo_entity">
        <!-- Description of table fields !-->
    </createTable>

    <createCompositeEntity name="item_with_references">
        <!-- Description of the fields of a complex entity !-->
    </createCompositeEntity>

    <createAsyncLoad name="allowedAsyncLoads">
        <entityList>
            <entity name="item" limit="100"/>
            <entity name="item_with_references" limit="1000"/>
            <entity name="demo_entity" limit="1000000"/>
        </entityList>
    </createAsyncLoad>

    <deleteAsyncLoad name="removeEntities">
        <entityList>
            <entity name="demo_entity"/>
        </entityList>
    </deleteAsyncLoad>

</changeSet>

The limit attribute is required when creating createAsyncLoad

business process

Example 1. Configuring a delegate for asynchronous loading

delegateConfiguration

As a result of processing, several events may occur, depending on the status of the result. An event type consists of an entity name and a status.

Example 2. Examples of setting event handling of successful loading of the item entity

succesEvent

Example 3. Examples of setting event processing for when saving the item entity

constraintViolation

The general rule for forming events during asynchronous interaction is formed using `camel case' and consists of `the name of the entity on which the operation is performed + the name of the operation + the result of the operation'

Table 1. Possible statuses of the processing result
The result of the operation Description An example of an event on a business process

SUCCESS

The operation ended successfully.

%item%DataLoadCsvSuccess

CONSTRAINT_VIOLATION

Data from the file cannot be loaded because one of them violates existing database rules.

%item%DataLoadCsvConstraintViolation

OPERATION_FAILED

An error occurred while processing the file.

%item%DataLoadCsvOperationFailed

7. Low-level service design

7.1. Library of Liquibase-extensions for modeling the date model of the registry

The result of processing the createAsyncLoad deleteAsyncLoad tags is the formation of a list of structures for which asynchronous loading of data from files in the metadata table is allowed.

7.2. Delegate for sending asynchronous messages

When sending a message using a delegate, service headers for tracing are sent along with the message body.

Delegate fields that are filled in during modeling.

Entity name_ - name of the object date of the model (table or complex object)
File is a structure that represents a file and consists of a key to the file and a checksum.
Signature - a structure that represents the form that was signed with the contents of the file.
Derived file (optional) - a structure that represents a file that was created in a business process or as a result of processing the original file.
Variable - the name of the variable in which the result of file processing will be saved.
JWT token - user token.

An example of a message body for saving data from a file
{
  "payload": {
    "file": {
      "checksum": "....",
      "id": "process/bp-instance-id/uuid"
    },
    "derivedFile": {
      "checksum": "...",
      "id": "process/bp-instance-id/uuid"
    }
  }
}

Всі метадані до повідомлення передаються в заголовках до повідомлення разом з типовими для БП "X-Digital-*" заголовками.

X-Digital-Signature - користувацький підпис.
X-Digital-Signature-Derived - підпис який генерується на підставі фінального повідомлення.
EntityName - назва обʼєкту дата моделі.
ResultVariable - назва персистеної змінної в яку буде збережено результат обробки файлу.

7.3. Synchronous registry data management service

Validation takes place according to the existing process by proxying requests to the synchronous data management service, the rules regarding the permitted number of entities exposed by the modeler are formed at the service generation stage.

7.4. Asynchronous registry data management service

The process of processing the message is carried out by existing handlers for saving entities (createEntity, createCompositeEntity) which is dynamically selected according to the entity name depending on the value of the entityName field, the formation of the routing list of entityName to the handler takes place at the generation stage.

The processing result will be the status and details for the message.

{
  "status": "SUCCESS",
  "details": "OK"
}
{
  "status": "CONSTRAINT_VIOLATION",
  "details": "error: {%s} in line: {%d}"
}

The text from the error about the violation of the database rules is taken from the procedure, and the line number is due to keeping a counter in the middle of the transaction.

7.5. Handler of data loading results event messages for the business process execution service

Correlation of the result with the business process occurs due to BusinessProcessInstanceId from the context. The message type is generated dynamically based on the entity type and the result.

An example of a possible correlation
@Component
public class AsyncDataLoadResponseKafkaListener {
    private static final String ACTION = "DataLoadCsv";
    @Autowired
    private RuntimeService runtimeService;

    @KafkaListener("data-load.csv.outbound")
    public void processAsyncMessages(
            @Payload AsyncDataLoadResponse message,
            MessageHeaders headers) {
        AsyncDataLoadResult payload = message.geyPayload();

        RequestContext requestContext = message.getRequestContext();
        Result result = new Result(message.getStatus(), message.getDetails());
        runtimeService.createMessageCorrelation(payload.getEntityName() + ACTION + message.getStatus())
          .processInstanceId(requestContext.getProcessInstanceId())
          .setVariable(payload.getResultVariable(), result)
          .correlate();
    }

}

8. High-level development plan

8.2. Development plan

  • Creation of a new form for uploading data from CSV files

  • Liquibase library extension with additional tags.

  • Development of a new delegate for sending asynchronous messages.

  • Extension of the asynchronous registry data management service to work with data download notifications.

  • Extension of the business process execution service with a component for processing incoming messages.

  • Development of an abstract example of BP.

  • Changing the existing form in terms of the need to specify the entity for validation (the field becomes optional and validation is carried out only if there is a value in this field)

  • Expanding the ability to save CSV files as files in registry data management services