'How to make this asynchronous method invocation working?
I was trying to develop a method pipeline using asynchronous method invocation. The logic for the pipeline is as follows
- There are n data in a collection that have to be fed into m number of methods in a pipeline
- Enumerate a collection of T
- Feed the first element to the first method
- Get the output, feed it to the second method asynchronously
- At the same time, feed the second element of the collection to the first method
- After completion of the first method, fed the result to the second method (if the second method is still running, put the result into its queue and start executing the third element at first method)
- When second method finishes executing take the first element from the queue and execute and so on (every method should run asynchronously, no one should wait for the next to finish)
- At the mth method, after executing the data, store the result to a list
- After completing nth element at the mth method return the list of the results (n number of results) to the very first level.
I came up with a code as follows, but it did not work as intended, the result never gets returned and moreover it is not executing in the order as it should be.
static class Program
{
static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3, 4 };
var result = list.ForEachPipeline(Add, Square, Add, Square);
foreach (var element in result)
{
Console.WriteLine(element);
Console.WriteLine("---------------------");
}
Console.ReadLine();
}
private static int Add(int j)
{
return j + 1;
}
private static int Square(int j)
{
return j * j;
}
internal static void AddNotify<T>(this List<T> list, T item)
{
Console.WriteLine("Adding {0} to the list", item);
list.Add(item);
}
}
internal class Function<T>
{
private readonly Func<T, T> _func;
private readonly List<T> _result = new List<T>();
private readonly Queue<T> DataQueue = new Queue<T>();
private bool _isBusy;
static readonly object Sync = new object();
readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);
internal Function(Func<T, T> func)
{
_func = func;
}
internal Function<T> Next { get; set; }
internal Function<T> Start { get; set; }
internal int Count;
internal IEnumerable<T> Execute(IEnumerable<T> source)
{
var isSingle = true;
foreach (var element in source) {
var result = _func(element);
if (Next != null)
{
Next.ExecuteAsync(result, _waitHandle);
isSingle = false;
}
else
_result.AddNotify(result);
}
if (!isSingle)
_waitHandle.WaitOne();
return _result;
}
internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
{
lock(Sync)
{
if(_isBusy)
{
DataQueue.Enqueue(element);
return;
}
_isBusy = true;
_func.BeginInvoke(element, CallBack, resetEvent);
}
}
internal void CallBack(IAsyncResult result)
{
bool set = false;
var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
var resultElement = worker.EndInvoke(result);
var resetEvent = result.AsyncState as ManualResetEvent;
lock(Sync)
{
_isBusy = false;
if(Next != null)
Next.ExecuteAsync(resultElement, resetEvent);
else
Start._result.AddNotify(resultElement);
if(DataQueue.Count > 1)
{
var element = DataQueue.Dequeue();
ExecuteAsync(element, resetEvent);
}
if(Start._result.Count == Count)
set = true;
}
if(set)
resetEvent.Set();
}
}
public static class Pipe
{
public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
{
Function<T> start = null, previous = null;
foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
{
if (start == null)
{
start = previous = function;
start.Start = function;
continue;
}
function.Start = start;
previous.Next = function;
previous = function;
}
return start != null ? start.Execute(source) : null;
}
}
Can you guys please help me to make this thing work? If this design is not good for an actual method pipeline, please feel free to suggest a different one.
Edit: I have to stick to .Net 3.5 strictly.
Solution 1:[1]
Any particular reason for taking pipe-line approach? IMO, launching a separate thread for each input with all functions chained one after another would be simpler to write and faster to execute. For example,
function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
T value = input;
foreach(var f in pipe)
{
value = f(value);
}
return value;
}
var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}
Now, coming to your code, I believe for accurate pipeline implementation with M stage, you must have exactly M threads as each stage can execute in parallel - now, some threads may be idle because i/p has not reached them. I am not certain if your code is launching any threads and what will be the count of thread at particular time.
Solution 2:[2]
Why dont you break off a thread for each iteration and aggregate your results in a locking resource. You only need to do. Could use PLinq for this. I think you might be mistaking methods for resources. You only need lock a method if it is dealing with a critical block with a shared resource in it. By picking a resource off and breaking into a new thread from there, you eliminate the need to manage your second method.
I.E.: Method X Calls Method1 then Passes value into Method2 Foreach item in arr Async(MethodX(item));
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Slappy |
