'NiFi how to manage out of order when EnforceOrder is not applicable?
I'm receiving real-time data that can arrive Out-Of-Order, some data arrive late compared to others. For event processing later I need to reorder the stream of flowfile and I'm trying to see if that's possible in NiFi.
My understanding of EnforceOrder is that it require an incremental integer as flowfile attribute, and knowing the starting point.
As I'm dealing with real-time flow I don't have a starting point so I don't see how to make that work.
What I have is a timestamp in the data that I can extract, convert as a linux timestamp, and write in the "priority" attribute. So after that I setup a queue with Prioritizer = PriorityAttributePrioritizer. But the flow file don't stay long enough in the queue to catch up the out-of-order issue, as all processor after that are fast (i'm dealing with 1200 flowfile / sec).
a) Is there a way to have a queue acting as a buffer, still keeping the speed, just delaying enough to have a fair number of flowfile in the queue to be prioritized ?
b) Or how else can we fix an out-of-order issue ?
Solution 1:[1]
I found a solution with 2 methods, either wait for a specific time to release a group of flow file (solution 2), or wait for a number of flow file (solution 1). I'm using one method or the other, not both together.
In the picture here : flow , I've described both under solution 1 and solution 2.
Solution 2:[2]
I had designed a waiter process group for this. In my case I had to wait 10 seconds to sort my flows. Below process group basically circles flows in a certain period.
You can also apply your priority on unmatched queue, which should be big enough.
RouteOnAttribute
UpdateAttribute
Solution 3:[3]
I've something here that look like working at slow speed but for the moment it breakdown at higher speed.
The first block is just a simulator that generate a flowfile par second. The UpdateAttribute can be the one that generate the priority attribute for sorting.
The rest is to manage burst.
- Increase a counter for each flowfile
- wait for the counter to reach a limit
- if flowfiles go through, the counter is reset
The 1st Notify increment the counter by 1 for each flowfile.
The Wait will let flowfile go through only if the counter reach 5 The queue before the flowfile use PriorityAttributePrioritizer, all the other queue will be set to FIFO.
The Last Notify that reset the counter to zero
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | |
| Solution 3 |







