'How to correctly setup transient EMR steps in Json?

I'm really new to creating transient EMR lambdas and I need help putting understanding how to configure each step of the transient EMR lambda. Essentially, three things need happen:

  1. Spin up lambda with needed memory and nodes
  2. Execute the extraction.py file that loops through an entire folder of sql files
  3. Terminate lambda.

Sample Code:

Steps in config.json:

"Steps": [
                  {
                        "Name": "EMR Creation",
                        "ActionOnFailure": "CANCEL_AND_WAIT",
                        "HadoopJarStep": {
                            "Jar": "command-runner.jar",
                            "Args": [
                                "spark-submit",
                                "--deploy-mode", "client",
                                "--master", "yarn",
                                { "Fn::Sub": [ "${Timestamp}", { "Timestamp": { "Ref": "Timestamp" } } ] },
                                { "Fn::Sub": [ "${ProcessPattern}", { "ProcessPattern": { "Ref": "ProcessPattern" } } ] },
                                "--executors", "5",
                                "--cores", "4",
                                "--memory", "40G",
                                "--driver", "20G",
                                { "Fn::FindInMap": [ "EnvironmentMap", { "Ref": "Environment" }, "Env" ] }
                            ]
                        }
                    },
                    {
                        "Name": "SQL Extraction",
                        "ActionOnFailure": "TERMINATE_CLUSTER",
                        "HadoopJarStep": {
                            "Jar": "command-runner.jar",
                            "Args": [
                                "extraction_pull.handler.py"
                            ]
                        }
                    },
                    {
                        "Name": "Beardown_n_Teardown",
                        "ActionOnFailure": "TERMINATE_CLUSTER",
                        "HadoopJarStep": {
                            "Jar": "command-runner.jar",
                            "Args": [
                                "emr_downtime.py"
                            ]
                        }
                    }
                ]

The extraction.pyfile is pretty straightforward, it just calls some other code in a git repo:

from db_layer.mssql_exctraction_call import JerryCall

def handle(event, context):
    JCall = JerryEtlCall(event)
    JerryEtlCall.call_engine()

    return "SUCCESS"

The extraction_call does something like this:

for sql_filename in os.listdir(self.SQL_FOLDER):
            if not sql_filename.endswith('.sql'):
                print('{} is not a .sql file.'.format(sql_filename))
                continue
            with open(os.path.join(self.SQL_FOLDER, sql_filename), 'r') as sql_in:
                cursor = connection.cursor()
                s3_filename = os.path.join(self.S3_FOLDER, sql_filename[:-4])
                lines = sql_in.read()
                mssql_query = textwrap.dedent("""{}""".format(lines))

                try:
                    cursor.execute(mssql_query)
                    print(sql_filename + ' was executed successfully.')
                except Exception as mssql_error:
                    print('{0} was not pulled because of an error'.format(sql_filename))
                    print(mssql_error)
                    cursor.close()
                    connection.rollback()
                    continue

THe sql files are in a different directory and it has 5 files in there. Once the extraction of those queries are done, I need to just kill the emr-lambda. The queries are pretty large queries that can't be handled in a regular lambda which is why I decided to use the transient lambda. How the environment is setup, the data can only be pulled with using lambdas. It's a bit tricky, but that's what I have to work with.

I have my step where I spin up the lambda, and that works just fine, however when I add my next step, 'extraction.py` it begins to fail. I am pretty new to transient emr lambdas so I do need some help with this. Could someone please provide some guidance? Please and Thank you for the help!!!!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source