'How to handle dynamic index creation in Elasticsearch using Apache NiFi?

I am routing data through to Elasticsearch using Nifi. I'm using NiFi to dynamically create indices based on a set of attributes. I'm using Index Lifecycle Policy Management in Elasticsearch which requires all indices to be manually bootstrapped beforehand for ILM settings to be applied. Since my NiFi flow automatically ingests messages into Elasticsearch any index created automatically will not have have ILM policies applied.

Currently my flow is Nifi Consume from Kafka --> Update Attribute --> PutElasticsearch Record.

A solution (I think) would be to call the invokehttp processor in front of the PutElasticsearch processor to bootstrap the indices dynamically via the attributes extracted before ingesting into elasticsearch. Indices are dynamically created using the syntax: index_${attribute_1}_${attribute_2}. My only concern here is the invoke invokehttpprocessor would run with every new flowfile. This could be thousands of calls to bootstrap an index. And if the index already exists there could be collision there.

Is this really the best way to do this? Perhaps I could run the QueryElasticsearchRecord processor to get a list of indices and somehow match that against incoming flowfiles on the attribute_1 and attribute_2 field. But that would still require a continuous query, I think?



Solution 1:[1]

What you could do is have the InvokeHTTP run if and only if it sees a specific value or attribute that would signal that a new (previously unsent) index value to input into ElasticSearch is required. Just an idea if you want to head down that route.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Mike R