'GCP Dataflow Error: "Failure getting groups, quitting"

I have a batch Dataflow pipeline that reads a csv file from a cloud storage bucket folder, processes the data and writes to a new file in the same bucket subfolder, and is triggered by a cloud function when a new file is uploaded to cloud storage. The pipeline runs fine and produces the desired output when testing with a small csv file (25 rows) but fails to write the output file when processing larger files, producing the error: "Failure getting groups, quitting".

Full error from logs explorer:

{
  "insertId": "s=1f9f52b3276640528b537fd9e09a6c74;i=29b;b=715c0571349543b08fc296a56da392cb;m=b2fd5f;t=5d66d3d6020cf;x=8fb1cd537c367ea3",
  "jsonPayload": {
    "message": "Failure getting groups, quitting"
  },
  "resource": {
    "type": "dataflow_step",
    "labels": {
      "project_id": "my-project",
      "job_name": "Generate Clutch Product Code URLs - d29c0a",
      "job_id": "2022-01-25_11_38_17-5732726158246265518",
      "region": "us-central1",
      "step_id": ""
    }
  },
  "timestamp": "2022-01-25T19:39:13.042639Z",
  "severity": "ERROR",
  "labels": {
    "dataflow.googleapis.com/log_type": "system",
    "compute.googleapis.com/resource_name": "generateclutchproductcode-01251138-5h0y-harness-63q4",
    "compute.googleapis.com/resource_type": "instance",
    "dataflow.googleapis.com/job_name": "Generate Clutch Product Code URLs - d29c0a",
    "compute.googleapis.com/resource_id": "3115486816356921127",
    "dataflow.googleapis.com/region": "us-central1",
    "dataflow.googleapis.com/job_id": "2022-01-25_11_38_17-5732726158246265518"
  },
  "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fsystem",
  "receiveTimestamp": "2022-01-25T19:39:23.792851821Z"
}

In addition to the error, I also get the following warning (which may or may not be related):

Discarding unparseable args: ['--beam_plugins=apache_beam.io.filesystem.FileSystem', '--beam_plugins=apache_beam.io.hadoopfilesystem.HadoopFileSystem', '--beam_plugins=apache_beam.io.localfilesystem.LocalFileSystem', '--beam_plugins=apache_beam.io.gcp.gcsfilesystem.GCSFileSystem', '--beam_plugins=apache_beam.io.aws.s3filesystem.S3FileSystem', '--beam_plugins=apache_beam.io.azure.blobstoragefilesystem.BlobStorageFileSystem', '--pipeline_type_check', '--pipelineUrl=gs://my-project-dataflows/Templates/staging/beamapp-user-0125193126-815021.1643139086.815242/pipeline.pb', '--gcpTempLocation=gs://dataflow-staging-us-central1-883825732987/temp', '--autoscalingAlgorithm=NONE', '--numWorkers=2', '--direct_runner_use_stacked_bundle', '--templateLocation=gs://my-project-dataflows/Templates/Generate_Clutch_Product_Codes.py', '--maxNumWorkers=0', '--dataflowJobId=2022-01-25_11_38_17-5732726158246265518', '--job_server_timeout=60']

My pipeline code:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import WriteToText
import logging
import traceback
import csv
import sys
import logging
from cryptography.fernet import Fernet
from csv import reader, DictReader, DictWriter
import google.auth
from google.cloud import storage

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--fernet_key', type=str, help='Fernet secret used to serialize product codes')
        parser.add_argument('--bucket', type=str, help='Cloud Storage bucket containing relevant files')
        parser.add_argument('--input_file', type=str, help='File containing product codes to convert')
        parser.add_argument('--output_file', type=str, help='Destination of the new file')

def generate_product_code_urls_pipeline(project, env, region):
    options = PipelineOptions(
        streaming=False,
        project=project,
        region=region,
        staging_location=f'gs://my-project-{env}-dataflows/Templates/staging',
        temp_location=f'gs://my-project-{env}-dataflows/Templates/temp',
        template_location=f'gs://my-project-{env}-dataflows/Templates/Generate_Clutch_Product_Codes.py',
        subnetwork=f'https://www.googleapis.com/compute/v1/projects/{project}/regions/us-central1/subnetworks/{env}-private'
    )
    custom_options = options.view_as(CustomOptions)
    custom_options.view_as(SetupOptions).save_main_session = True
    logging.info(f'Custom Options: {custom_options}')
    
    # Transform function
    def genURLs(code):
        from cryptography.fernet import Fernet
        f = Fernet(custom_options.fernet_key)
        encoded = code.encode()
        encrypted = f.encrypt(encoded)
        decrypted = f.decrypt(encrypted.decode().encode())
        decoded = decrypted.decode()
        if code != decoded:
            logging.info(f'Original product code {code}, and decoded code {decoded} do not match')
            sys.exit(1)
        url = 'https://my-url.com/scan?code=' + encrypted.decode()
        return url
    
    class UpdateMetadata(beam.DoFn):

        def __init__(self, bucket_name):
            self.bucket_name = bucket_name

        def start_bundle(self):
            from google.cloud import storage
            self.client = storage.Client()

        def process(self, urls):
            logging.info(f'Updating object metadata...')
            bucket = self.client.bucket(self.bucket_name)
            blob = bucket.get_blob(custom_options.output_file)
            blob.content_type = 'text/csv'
            blob.patch()      

    # End function
    p = beam.Pipeline(options=options)
    (p | 'Read Input CSV' >> beam.io.ReadFromText(f'gs://{custom_options.bucket}/{custom_options.input_file}', skip_header_lines=1)
       | 'Map Codes' >> beam.Map(genURLs)
       | 'Write PCollection to Bucket' >> WriteToText(f'gs://{custom_options.bucket}/{custom_options.output_file}', num_shards=1, shard_name_template='', header='URL')
       | 'Update Object Metadata' >> beam.ParDo(UpdateMetadata(custom_options.bucket)))
    
    p.run()

# Pipeline execution
try:
    region = 'us-central1'
    env = 'dev'
    cred, project = google.auth.default()

    generate_product_code_urls_pipeline(project, env, region)    
    logging.info('\n PIPELINE FINISHED \n')
except (KeyboardInterrupt, SystemExit):
    raise
except:
    logging.error('\n PIPELINE FAILED')
    traceback.print_exc()

What's more, the job graph shows that all steps were successfully completed. It seems like it could be an issue with the workers writing the file to the desired location, but that's my best guess as I've had trouble finding information about this error. Any further info or suggestions would be a huge help and very appreciated.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source