'AWS Step Functions Consuming messages from SQS

I am consuming messages from SQS to trigger queries. When I normally consume a message from SQS in Python, I need to delete the message from SQS. Do I have to manually delete the message from SQS in a Step Function? What is the best/simplest way to do so?

I believe SQS has done the integration:

{
  "Comment": "Run Redshift Queries",
  "StartAt": "ReceiveMessage from SQS",
  "States": {
    "ReceiveMessage from SQS": {
      "Type": "Task",
      "Parameters": {
        "QueueUrl": "******"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "Next": "Run Analysis Queries",
      "ResultSelector": {
        "body.$": "States.StringToJson($.Messages[0].Body)"
      }
    },
    "Run Analysis Queries": {
      "Type": "Task",
      "Parameters": {
        "ClusterIdentifier": "******",
        "Database": "prod",
        "Sql": "select * from ******"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
      "End": true
    }
  },
  "TimeoutSeconds": 3600
}

I just did a test and it seems that the messages goes down temporarily but then goes up again.

Is the best way to insert a Lambda in between the "ReceiveMessage from SQS" stage & Redshift stage?

This raised another question. I have only run this manually. How do I activate this Step Function eventually to run on any message?



Solution 1:[1]

If you must use SQS, then you will need to have a lambda function to act as a proxy. You will need to set up the queue as a lambda trigger, and you will need to write a lambda that can parse the SQS message and make the appropriate call to the Step Functions StartExecution API.

Solution 2:[2]

After you consume a message, you have to delete it using sqs:deleteMessage. The reason you see it reappear in the queue is because once it's read by an application it becomes hidden for ~30 seconds to avoid other applications process it simultaneously.

Here is an example of how to read, process and delete a message from the queue. Mind that I added MaxNumberOfMessages equals 1 and a ResultPath different than $

"ReceiveMessage from SQS": {
  "Type": "Task",
  "Parameters": {
    "MaxNumberOfMessages": 1,
    "QueueUrl": "******"
  },
  "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
  "Next": "Run Analysis Queries",
  "ResultSelector": {
    "body.$": "States.StringToJson($.Messages[0].Body)"
  }
},
"Run Analysis Queries": {
  "Type": "Task",
  "Parameters": {
    "ClusterIdentifier": "******",
    "Database": "prod",
    "Sql": "select * from ******"
  },
  "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
  "ResultPath": "$.redshift_output",
  "Next": "delete_sqs"
},
"delete_sqs": {
  "Comment": "Deletes SQS message",
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
  "Parameters": {
    "ReceiptHandle.$": "$.Messages[0].ReceiptHandle",
    "QueueUrl": "******"
  },
  "ResultPath": null,
  "Next": "update_result"
}

Also, you may read up to 10 messages at a time setting MaxNumberOfMessages equals 10 along with a Map step like in this example here:

{
    "StartAt": "read_sqs",
    "States": {
      "read_sqs": {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
        "Parameters": {
          "MaxNumberOfMessages": 10,
          "QueueUrl": "*******"
        },
        "ResultPath": "$.queueResponse",
        "Next": "check_results"
      },
      "check_results": {
        "Comment": "Checking if queue is empty",
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.queueResponse.Messages[0]",
            "IsPresent": true,
            "Next": "map_results"
          }
        ],
        "Default": "exit"
      },
      "map_results": {
        "Comment": "Performs a 'map' operation over each payload",
        "Type": "Map",
        "ItemsPath": "$.queueResponse.Messages",
        "MaxConcurrency": 10,
        "Iterator": {
          "StartAt": "read_request",
          "States": {
            "read_request": {
              "Comment": "Parses and moves the request body into the response",
              "Type": "Pass",
              "Parameters": {
                "requestBody.$": "States.StringToJson($.Body)"
              },
              "ResultPath": "$.map_response",
              "Next": "Run Analysis Queries"
            },
            "Run Analysis Queries": {
              "Type": "Task",
              "Parameters": {
                "ClusterIdentifier": "******",
                "Database": "prod",
                "Sql": "select * from ******"
              },
              "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
              "ResultPath": "$.redshift_output",
              "Next": "delete_sqs"
            },
            "delete_sqs": {
              "Comment": "Deletes SQS message",
              "Type": "Task",
              "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
              "Parameters": {
                "ReceiptHandle.$": "$.ReceiptHandle",
                "QueueUrl": "*******"
              },
              "ResultPath": null,
              "End": true
            }
          }
        },
        "ResultPath": "$.flowResponse",
        "Next": "exit"
      },
      "exit": {
        "Type": "Pass",
        "End": true
      }
    }
  }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 AWS PS
Solution 2 Leopoldo Varela