We are using re-indexing to get all the data to the respective indexer. We have 2 steps for this. First is to run the connector from the playground, which is followed by legacyindexer service call from the indexer service, which internally calls the respective plain search service to get the data and to send it to the respective indexer.
Copy curl --location --request DELETE 'http://kafka-connect.kafka-cluster:8083/connectors/firenoc-services-enriched-es-sink'
Copy curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--header 'Postman-Token: 419e68ba-ffb9-4da9-86e1-7ad5a4c8d0b9' \
--data '{
"name": "firenoc-services-enriched-es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "general",
"tasks.max": "1",
"max.retries": "15",
"key.ignore": "false",
"retry.backoff.ms": "5000",
"max.buffered.records": "25",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"read.timeout.ms": "100000",
"topics": "firenoc-services-enriched",
"batch.size": "25",
"max.in.flight.requests": "2",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn",
"flush.timeout.ms": "3600000",
"errors.deadletterqueue.topic.name": "firenoc-services-enriched-failed",
"errors.tolerance": "all",
"value.converter.schemas.enable": "false",
"name": "firenoc-services-enriched-es-sink",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200",
"linger.ms": "1000",
"transforms": "TopicNameRouter",
"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicNameRouter.regex": "firenoc-services-enriched*",
"transforms.TopicNameRouter.replacement": "firenoc-services-enriched"
}
}'
Copy curl --location --request POST 'http://localhost:8055/egov-indexer/index-operations/_legacyindex' \
--header 'Content-Type: application/json' \
--data-raw '{
"RequestInfo": {
"apiId": "string",
"ver": "string",
"ts": null,
"action": "string",
"did": "string",
"key": "string",
"msgId": "string",
"authToken": "155c6208-7e84-4e44-a353-4b0e149e795e",
"correlationId": "e721639b-c095-40b3-86e2-acecb2cb6efb",
"userInfo": {
"id": 23299,
"uuid": "e721639b-c095-40b3-86e2-acecb2cb6efb",
"userName": "9337682030",
"name": "Abhilash Seth",
"type": "CITIZEN",
"mobileNumber": "9337682030",
"emailId": "abhilash.seth@gmail.com",
"roles": [
{
"id": 281,
"name": "Citizen"
}
]
}
},
"apiDetails": {
"uri": "http://firenoc-services.egov:8080/firenoc-services/v1/_search",
"tenantIdForOpenSearch": "pb",
"paginationDetails": {
"offsetKey": "offset",
"sizeKey": "limit",
"maxPageSize": 25,
"limit":25
},
"responseJsonPath": "$.FireNOCs"
},
"legacyIndexTopic": "legacy-fn-firenoc",
"tenantId": "pb.amritsar"
}'
Copy curl --location --request DELETE 'http://kafka-connect.kafka-cluster:8083/connectors/firenoc-services-enriched-es-sink'
Copy POST /_aliases
{
"actions": [
{
"add": {
"index": "firenoc-services-enriched",
"alias": "firenoc-services"
}
}
]
}
NOW your firenoc-services index is update with the data of firenoc till now in the system.