'Strange behavior trying to perform data migration in Dynamo DB

We're trying to make a simple data migration in one of our tables in DDB.

Basically we're adding a new field and we need to backfill all the Documents in one of our tables.

This table has around 700K documents.

The process we follow is quite simple:

  • Manually trigger a lambda that will scan the table and for each document, will update the document and continue doing the same til its close to the 15 minutes top, in that case
  • Puts LastEvaluatedKey into SQS to trigger new lambda execution that uses that key to continue scanning.
  • Process goes on spawining lambdas sequentially as needed until there are no more documents

The problem we found is as follows...

Once the migration is done we noticed that the number of documents updated is way lower than the total number of documents existing in that table. It's a random value, not the same always but it ranges from tens of thousands to hundreds of thousands (worst case we seen was 300K difference).

This is obviously a problem, because if we scan the documents again, it seems obvious some documents were not migrated. We thought at first this was because of some clients updating/inserting new documents but the throughput on that table is not that large that will justify such a big difference, so this is not that there are new documents being added while we run the migration.

We tried a second approach that was first scanning, because if we only scan, we noticed that number of scan documents == count of documents in table, so we tried to dump the IDs of the documents in another table, then scan that table and update those items again. Funny thing, same problem happens with this new table with just IDs, there are way less than the count in the table we want to update, thus, we're back to square one.

We thought about using parallel scans but I don't see how this could benefit plus I don't want to compromise reading capacity for the table while running the migration.

Anybody with experience in data migrations in DDB can shed some light here? We're not able to figure out what we're doing wrong.

UPDATE: Sharing the function that is triggered and actually scans and updates

    @Override
   public Map<String, AttributeValue> migrateDocuments(String lastEvaluatedKey, String typeKey){
      
    LOG.info("Migrate Documents started {} ", lastEvaluatedKey);
    
    int noOfDocumentsMigrated = 0;
    Map<String, AttributeValue> docLastEvaluatedKey = null;
    
    DynamoDBMapperConfig documentConfig = new DynamoDBMapperConfig.TableNameOverride("KnowledgeDocumentMigration").config();
      
      if(lastEvaluatedKey != null) {
          docLastEvaluatedKey = new HashMap<String,AttributeValue>(); 
          docLastEvaluatedKey.put("base_id", new AttributeValue().withS(lastEvaluatedKey));
          docLastEvaluatedKey.put("type_key",new AttributeValue().withS(typeKey));
      }
      Instant endTime = Instant.now().plusSeconds(840);
      LOG.info("Migrate Documents endTime:{}", endTime);
     
      try {
        
         do { 
            
            ScanResultPage<Document> docScanList = documentDao.scanDocuments(docLastEvaluatedKey, documentConfig);
            docLastEvaluatedKey = docScanList.getLastEvaluatedKey();
            
            LOG.info("Migrate Docs- docScanList Size: {}", docScanList.getScannedCount());
            docLastEvaluatedKey = docScanList.getLastEvaluatedKey();
            LOG.info("lastEvaluatedKey:{}", docLastEvaluatedKey);

            
            final int chunkSize = 25;
            final AtomicInteger counter = new AtomicInteger();

            final Collection<List<Document>> docChunkList = docScanList.getResults().stream()
                  .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / chunkSize)).values();

            List<List<Document>> docListSplit = docChunkList.stream().collect(Collectors.toList());
            docListSplit.forEach(docList -> {

               TransactionWriteRequest documentTx = new TransactionWriteRequest();

               for (Document document : docList) {
                  LOG.info("Migrate Documents- docList Size: {}", docList.size());
                 
                  LOG.info("Migrate Documents- Doc Id: {}", document.getId());

                  if (!StringUtils.isNullOrEmpty(document.getType()) && document.getType().equalsIgnoreCase("Faq")) {
                     
                     if (docIdsList.contains(document.getId())) {
                        LOG.info("this doc already migrated:{}", document);
                     } else {
                        docIdsList.add(document.getId());
                     }

                     if ((!StringUtils.isNullOrEmpty(document.getFaq().getQuestion()))) {
                        LOG.info("doc FAQ {}", document.getFaq().getQuestion());
                        document.setTitle(document.getFaq().getQuestion());
                        document.setTitleSearch(document.getFaq().getQuestion().toLowerCase());
                        documentTx.addUpdate(document);
                     }
                  } else if (StringUtils.isNullOrEmpty(document.getType())) {
                     if (!StringUtils.isNullOrEmpty(document.getTitle()) ) {
                        if (!StringUtils.isNullOrEmpty(document.getQuestion())) {
                           document.setTitle(document.getQuestion());
                           document.setQuestion(null);
                        }
                        LOG.info("title {}", document.getTitle());
                        document.setTitleSearch(document.getTitle().toLowerCase());
                        documentTx.addUpdate(document);
                     }
                  }

               }

               if (documentTx.getTransactionWriteOperations() != null
                     && !documentTx.getTransactionWriteOperations().isEmpty() && docList.size() > 0) {

                  LOG.info("DocumentTx size {}", documentTx.getTransactionWriteOperations().size());
                  documentDao.executeTransaction(documentTx, null);
               }
            });
             
            noOfDocumentsMigrated = noOfDocumentsMigrated + docScanList.getScannedCount();
         }while(docLastEvaluatedKey != null &&  (endTime.compareTo(Instant.now()) > 0));
         
         LOG.info("Migrate Documents execution finished at:{}", Instant.now());
         
         if(docLastEvaluatedKey != null && docLastEvaluatedKey.get("base_id") != null)
            sqsAdapter.get().sendMessage(docLastEvaluatedKey.get("base_id").toString(), docLastEvaluatedKey.get("type_key").toString(),
                  MIGRATE, MIGRATE_DOCUMENT_QUEUE_NAME);
          
         LOG.info("No Of Documents Migrated:{}", noOfDocumentsMigrated);
         
       }catch(Exception e) {
          LOG.error("Exception", e);
       }
       return docLastEvaluatedKey;
      
   }


Solution 1:[1]

Note: I would've added this speculation as a comment but my reputation does not allow

I think the issue that you're seeing here could be caused by the Scans not being ordered. So as long as your Scan would be executed in a single lambda I'd expect to you see that everything was handled fine. However, as soon as you hit the runtime limit of the lambda & start a new one your Scan will essentially get a new "ScanID" which might come in a different order. Based on the different order you're now skipping a certain set of entries.

I haven't tried to replicate this behavior & sadly there is no clear indication in the AWS documentation whether a Scan Request can be created in a new Session/Application.

I think @Charles' suggestion might help you in this case as you can simply run the entire migration in one process.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Roni