'BlockingCollection bounded capacity performance degradation
I've got this C# process (.net 5.0) that reads from a zip file, deserializes the json to an object, and then transforms the json objects to DataTables for storage into a Sql Server database. After a lot of testing and optimization, I got these three phases to be very nearly identical in processing time (via Stopwatch measurements).
I thought I could improve throughput by having separate threads for each phase, but when I tried running it, the BlockingCollection<T> performance went down the tubes pretty quickly. I bounded the two queues to keep any one phase from getting too far off pace with any other, but after a short while, I got this very gap-toothed performance - spurts of activity with long periods of cpu quiescence.
Did I find some kind of degenerate case? Does BlockingCollection<T> have a lot of overhead relating to the boundedCapacity?
The implementation looked like this:
var readingQueue = new BlockingCollection<string>(1000);
var objectQueue = new BlockingCollection<JsonObj>(1000);
var phases = new Task[3];
phases[0] = Task.Run(() =>
{
for (;;)
{
var l = reader.ReadLine();
readingQueue.Add(l);
if (l == null)
break;
}
});
phases[1] = Task.Run(() =>
{
for (;;)
{
var json = readingQueue.Take();
if (json == null)
{
objectQueue.Add(null);
break;
}
var o = Deserializer.Deserialize<JsonObject>(json);
objectQueue.Add(o);
}
});
phases[2] = Task.Run(() =>
{
for (;;)
{
var o = objectQueue.Take();
if (o == null)
break;
TransformJsonObject(DataSet set, JsonObject o);
}
}
Task.WaitAll(phases);
Dropping the BlockingCollections entirely and just using Task.Run(() => reader.Readline()) for the I/O produces benefit, but parallelizing all three phases with BlockingCollection<T> goes south pretty fast.
EDIT:
I tried dropping to two threads and moving the work around, but whenever there was a BlockingCollection involved it got worse than the single threaded performance and the memory consumption went through the roof.
The version that worked best was
var nextLine = Task.Run(() => reader.ReadLine());
for (;;)
{
var json = nextLine.Result();
nextLine = Task.Run(() => reader.ReadLine());
if (l == null)
break;
var o = Deserializer.Deserialize<JsonObject>(json);
TransformJsonObject(DataSet set, JsonObject o);
}
The timings with that version were Total time spent Reading: 8388140 ms, Deserializing: 8870633 ms, Transform: 9240809 ms, Writing to db: 10231972 (separate queue)
but the middle 2 were synchronous. I noticed there was a slight weight on the last step, so I tried putting read and deserialize in one thread on transform to dataset on another, and the performance was still way below the above.
That's over about 22 million lines/objects.
EDIT: to move some of the comment discussion into the main section, I was given this program to maintain. We get daily dumps of largeish zip files. The program starts up a configurable number of threads to process the zip files (currently set at 5). Originally, each thread did the read/deserialize/transform to DataSet/write DataSet to Sql Server steps synchronously.
The first thing I did was to add a "write to db" thread/queue, and that worked well.
Then I started improving the times of the read/deserialize/transform steps... Cleaning up code, swapping one deserializer for another, etc. The timings for each of those phases were getting near identical, so I thought I'd parallelize further to try and improve the speed.
Now each of the zip file threads had one BlockingCollection for each line from the jsonl file, and one for the deserialized objects. Each thread fires up Tasks for the reading and the deserialization. The main file processing thread pulled from the deserialized object collection, did the transforms, and put the result on the db writing queue.
At that level of parallelization, the process ended up taking more than twice as long. I did a minidump of the process, and I found each threads' BlockingCollections completely empty, the db writing queue empty, and almost 5 gig of ram in use somewhere.
The individual phase stats (like the time spent on file i/o and deserializing the objects) were double what just leaving the 5 file processing threads (read/deserialize/transform) steps synchronous. That's the part that puzzled me. Takes longer, a bunch of phantom ram, and all the queues empty when doing these things in parallel compared to doing 3 of the 4 steps synch
I did find Oflow assertions that bounded BlockingCollections would sometimes wedge when they hit their bounds but not a lot of detail as to why.
Solution 1:[1]
BlockingCollection will perform poorly if all the collections are always simultaneously at their bounded capacity. Resulting in performance that mimics a single threaded implementation, but with the blocking overhead. The thread pool will also perform sub-optimally if Tasks have a lot of blocking in them. It might be worth exploring using the TryTake and TryAdd methods of the collections and allowing the idle tasks to yield.
for (;;;)
{
if (!collection.TryTake(out item))
{
//there's nothing to do, so we'll just chill out
await Task.Delay(/*whatever interval makes sense*/);
continue;
}
}
It's also worth noting that if you can tune the processing to work well in your current environment, that won't necessarily translate to your target environment, so you may find yourself constantly tweaking the workload between the tasks to get acceptable performance.
Since this is a pipeline of operations, you'll probably have better luck with BufferBlock and other parts of the TPL. They also have the advantage of being async/await compatible, so there's less blocking in general, and they support the same bounding limits as a BlockingCollection.
Here's a link to a tutorial that demonstrates the basics. BufferBlock allows chaining the blocks together and managing the pipeline as a unit, supports cancellation.
EDIT: If this is a long running operation, like your stats suggest, then you could benefit from using full Threads.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
