Persister service provides a framework to persist data in transactional fashion with low latency based on a config file. Removes repetitive and time consuming persistence code from other services.
Before you proceed with the documentation, make sure the following pre-requisites are met -
Prior Knowledge of Java/J2EE.
Prior Knowledge of SpringBoot.
Prior Knowledge of PostgresSQL.
Prior Knowledge of JSONQuery in Postgres. (Similar to PostgresSQL with a few aggregate functions.).
Kafka server is up and running.
Persist data asynchronously using kafka providing very low latency
Data is persisted in batch
All operations are transactional
Values in prepared statement placeholder are fetched using JsonPath
Easy reference to parent object using ‘{x}’ in jsonPath which substitutes the value of the variable x in the JsonPath with value of x for the child object.(explained in detail below in doc)
Supported data types ARRAY("ARRAY"), STRING("STRING"), INT("INT"),DOUBLE("DOUBLE"), FLOAT("FLOAT"), DATE("DATE"), LONG("LONG"),BOOLEAN("BOOLEAN"),JSONB("JSONB")
Persister uses configuration file to persist data. The key variables are described below:
serviceName: Name of the service to which this configuration belongs.
description: Description of the service.
version: the version of the configuration.
fromTopic: The kafka topic from which data is fetched
queryMaps: Contains the list of queries to be executed for the given data.
query: The query to be executed in form of prepared statement:
basePath: base of json object from which data is extrated
jsonMaps: Contains the list of jsonPaths for the values in placeholders.
jsonPath: The jsonPath to fetch the variable value.
To persist large quantity of data bulk setting in persister can be used. It is mainly used when we migrate data from one system to another. The bulk persister have the following two settings:
Any kafka topic containing data which has to be bulk persisted should have '-batch' appended at the end of topic name example: save-pt-assessment-batch.
Each persister config has a version attribute which signifies the service version, this version can contain custom DSL; defined here, GitHub - zafarkhaja/jsemver: Java implementation of the SemVer Specification
Every incoming request [via kafka] is expected to have a version attribute set, [jsonpath, $.RequestInfo.ver] if versioning is to be applied.
If the request version is absent or invalid [not semver] in the incoming request, then a default version defined by the following property in application.propertiesdefault.version=1.0.0
is used.
The request version is then matched against the loaded persister configs and applied appropriately.
Write configuration as per the requirement. Refer the example given earlier.
In the environment file, mention the file path of configuration under the variable egov.persist.yml.repo.path
while mentioning the file path we have to add file:///work-dir/
as prefix. for example: egov.persist.yml.repo.path = file:///work-dir/configs/egov-persister/abc-persister.yml
. If there are multiple file separate it with comma (,
) .
Deploy latest version of egov-persister service and push data on kafka topic specified in config to persist it in DB.
The persister configuration can be used by any module to store records in particular table of database.
Insert/Update Incoming Kafka messages to Database.
Add Modify kafka message before putting it into database.
Persist data asynchronously.
Data is persisted in batch.
Write configuration as per your requirement. Structure of the config file is explained above in the same document.
Check-in the config file to a remote location preferably github.
Provide the absolute path of the checked-in file to DevOps, to add it to the file-read path of egov-persister. The file will be added to egov-persister's environment manifest file for it to be read at start-up of the application.
Run the egov-persister app and push data on kafka topic specified in config to persist it in DB
Variable Name | Default Value | Description |
---|---|---|
persister.bulk.enabled
false
Switch to turn on or off the bulk kafka consumer
persister.batch.size
100
The batch size for bulk update
Persister Service persists data in the database in a sync manner providing very low latency. The queries which have to be used to insert/update data in the database are written in yaml file. The values which have to be inserted are extracted from the json using jsonPaths defined in the same yaml configuration.
Below is a sample configuration which inserts data in a couple of tables.
The above configuration is used to insert data published on the kafka topic save-pgr-request
in the tables eg_pgr_service_v2
and eg_pgr_address_v2
. Similarly, the configuration can be written to update data. Following is a sample configuration:
The above configuration is used to update the data in tables. Similarly, the upsert operation can be done using ON CONFLICT() function in psql.
The table below describes each field variable in the configuration.
Variable Name | Description |
---|---|
serviceName
The module name to which the configuration belongs
version
Version of the config
description
Detailed description of the operations performed by the config
fromTopic
Kafka topic from which data has to be persisted in DB
isTransaction
Flag to enable/disable perform operations in Transaction fashion
query
Prepared Statements to insert/update data in DB
basePath
JsonPath of the object that has to be inserted/updated.
jsonPath
JsonPath of the fields that has to be inserted in table columns
type
Type of field
dbType
DB Type of the column in which field is to be inserted