'Inserting in Dynamo DB in the batch of 25 skipping some rows

When I am trying to Put the data in batches for DynamoDB with below code, some of the rows are not getting inserted. I tried with reducing the batch size to 3. Still I found some of the rows are missing. I am sure that the data which we are trying to insert is not more than 10MB in each batch. can you please suggest the better alternative. Or Should I go with individual processing rather batch?

private async Task<int> DumpDynamoDB(IList<MatItem> response, ILambdaLogger logger)
        {
            int count = 0;

            var client = new AmazonDynamoDBClient();
         
            var materialsDict = new Dictionary<string,List<DynamoMaterial>>();

            foreach (var item in response)
            {
                var key = $"{item.Prop1}-{item.Prop2}";

                if (!materialsDict.ContainsKey(key))
                {
                    materialsDict[key] = new List<DynamoMaterial>();
                }
                else
                {
                    materialsDict[key].Add(new DynamoMaterial() { MD = item.MD, SO = item.SO, MN=item.MN });
                }
            }
            int totalBatches = materialsDict.Count / 25 + (materialsDict.Count % 25 > 0 ? 1 : 0);
            List<Task<BatchWriteItemResponse>> batches = new List<Task<BatchWriteItemResponse>>();
            List<WriteRequest> writeRequests;
            BatchWriteItemRequest batch25item;
            KeyValuePair<string,List<DynamoMaterial>> dictItem;

            //Create a batch request of 25 rows at a time. Each batch will run as a seperate thread.
            for (int i = 0; i < totalBatches; i++)
            {
                writeRequests = new List<WriteRequest>();
                for (; writeRequests.Count < 25 && count < materialsDict.Count; count++)
                {
                    dictItem = materialsDict.ElementAt(count); 
                    writeRequests.Add(
                        new WriteRequest
                        {
                            PutRequest = new PutRequest
                            {
                                Item = new Dictionary<string, AttributeValue>
                                        {
                                            { "PK", new AttributeValue { S = dictItem.Key }},
                                            { "Column1", new AttributeValue { S = JsonConvert.SerializeObject(dictItem.Value) }},
                                            { "COUNT", new AttributeValue { S = dictItem.Value.Count.ToString() }},
                                            { "TimeToExist", new AttributeValue { N = AWSSDKUtils.ConvertToUnixEpochSeconds(DateTime.Now.AddMonths(1)).ToString() }}
                                        }
                            }
                        });
                    logger.Log($"Batch item added for Item Key : {dictItem.Key} ");
                }
                //Batch request with 25 rows
                batch25item = new BatchWriteItemRequest
                {
                    RequestItems = new Dictionary<string, List<WriteRequest>>
                            {
                                {
                                    EnvironmentHelper.DynamoTableName,writeRequests
                                }
                            }
                };
                batches.Add(client.BatchWriteItemAsync(batch25item));
            }
            
            //Wait untill all the batches are finished
            await Task.WhenAll(batches);

            count++;

            return count;
        }


Solution 1:[1]

Although DynamoDB has limits on number of requests and their size above which BatchWriteItem and BatchGetItem don't work, it is not guaranteed that every batch below these limits will fully succeed. The BatchWriteItem documentation explains that one of the reasons why a BatchWriteItem operation may partially succeed is provisioned throughput, and what you need to do in that case:

If any requested operations fail because the table's provisioned throughput is exceeded or an internal processing failure occurs, the failed operations are returned in the UnprocessedItems response parameter. You can investigate and optionally resend the requests. Typically, you would call BatchWriteItem in a loop. Each iteration would check for unprocessed items and submit a new BatchWriteItem request with those unprocessed items until all items have been processed.

I also saw these operations partially succeeding in other cases, not involving provisioned throughput.

So the conslusion is that correctly-written code must call BatchWriteItem in a loop, resending UnprocessedItems if there are any returned from the previous call. If you don't do this, these items will simply not be written. Many AWS toolkits have higher-level functions, such as the batch_writer() in Python, which take care of all of this for you - deciding how to group the writes in batches, and when to retry items which weren't fully written.

Solution 2:[2]

Created one method and called after await Tasks.WhenAll() in the question.

private async Task RerunUnprocessedItems(List<Task<BatchWriteItemResponse>> batches, AmazonDynamoDBClient client)
{
    List<Task<BatchWriteItemResponse>> unprocessedBatches;
    do
    {
        unprocessedBatches = new List<Task<BatchWriteItemResponse>>();
        batches.ForEach(b =>
        {
            if (b.Result.UnprocessedItems.Count > 0)
            {
                unprocessedBatches.Add(client.BatchWriteItemAsync(b.Result.UnprocessedItems));
            }
        });
        await Task.WhenAll(unprocessedBatches);
        batches = unprocessedBatches;
    } while (batches.Count > 0);
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Nadav Har'El
Solution 2 Jeremy Caney