'Durable functions & external API Requests
I am migrating some long-duration proces from dotnet aspnetcore backend to a durable function, this process handles an import gathering resources from an external API.
As of now code works, but the sub-orchestrator Import_ImportTask calls the external API several times.
This is a cleaned version of the workflow I am trying to run. How can I be sure that when a functions is resumed it's not starting over?
[FunctionName("Import_Start")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")]
HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
var importRequest = await req.Content.ReadAsAsync<ImportRequest>();
string instanceId = await starter.StartNewAsync<ImportRequest>("Handle_Import", null, importRequest);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("Handle_Import")]
public async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,ILogger log)
{
var importRequest = context.GetInput<ImportRequest>();
foreach (var importTask in importRequest.ImportTasks)
{
var importImportTaskRequest = new ImportImportTaskRequest(importRequest, importTask);
log.LogWarning($"Handling ImportTask ${importTask}");
await context.CallSubOrchestratorAsync("Import_ImportTask", context.NewGuid().ToString(), importImportTaskRequest);
}
//TODO SendNotification
}
[FunctionName("Import_ImportTask")]
public async Task ImportImportTask([OrchestrationTrigger] IDurableOrchestrationContext context,ILogger log)
{
var importImportTaskRequest = context.GetInput<ImportImportTaskRequest>();
//This returns code from external API and it's called on every execution/resume of the function.
var sourceItems = await context.CallActivityAsync<SourceItemMetadata[]>("Import_GetSourceItems", importImportTaskRequest);
log.LogInformation($"SourceItems to process {sourceItems.Count()}");
var items = new List<Item>();
foreach (var sourceItem in sourceItems)
{
log.LogInformation($"Processing item {sourceItem.Id}");
items.Add(await context.CallActivityAsync<Item>("Import_Item", new ImportItemRequest(importImportTaskRequest, sourceItem.Url)));
}
await context.CallActivityAsync("Import_Save", Tuple.Create(items.ToArray(),items,importImportTaskRequest));
}
[FunctionName("Import_Item")]
public async Task<Item> ImportItem([ActivityTrigger] ImportItemRequest importItemRequest, ILogger log)
{
var backendItem = await externalAPI.GetItem(new Security { ApiKey = importItemRequest.ApiKey }, importItemRequest.Url);
var itemChilds = internalLibrary.GetItemChilds(importItemRequest, backendSubmission);
log.LogInformation($"Item {importItemRequest.Url} return {itemChilds.Count} Childs");
return new Item
{
ProjectId = importItemRequest.ProjectId,
Childs = itemChilds
};
}
[FunctionName("Import_Save")]
public async Task SaveItems([ActivityTrigger] Tuple<ImportTask[],Item[],Guid> saveRequest, ILogger log)
{
await repository.SaveToDb(saveRequest.Item1);
}
[FunctionName("Import_GetSourceItems")]
public async Task<SourceItemMetadata[]> GetSourceItems([ActivityTrigger] ImportTaskRequest apiRequest, ILogger log)
{
return await externalAPI.GetItems(new Security { ApiKey = ImportTaskRequest.ApiKey }, ImportTaskRequest.Url);
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
