'How to handle RuntimeException: GoogleJsonResponseException: 400 Bad Request error in Dataflow streaming writing data in BigQuery

I builded job streaming in dataflow with java, actually my datasource is a table in bigtable and i save data in windows fixed (apache beam) every 3 minutes and execute four transformatios. finally i'm writing in three destination, in the third destination the error occurs constantly Currently I do not know the origin of the error, I even put .skipInvalidRows() when write in bigquery but error continue.

below i show steps of my pipeline

step read every 3 minutes

       PCollection<List<ModelTableOfBigTable>> pCollectionTableOfBigTable = p
                .apply("", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(options.getWindowPeriod())))
                .apply(Window.<Long>into(FixedWindows.of(Duration.standardMinutes(options.getWindowPeriod()))))
                .apply("Scan TableOfBigTable",  ParDo.of(new ScanTableOfBigTableDoFn(bigtableTableConfig, options)));

step transformations

pCollectionTableOfBigTable - PCollection<List<ModelTableOfBigTable>>
   -> FirstTransformationCustom 
        -> pCollectionToMean - PCollection<KV<String, Double>>
            -> Mean Transformation
                -> pCollectionMean - PCollection<KV<String, Double>>
                    -> SecondTransformationCustom
                       -> pCollectionToGroupByKey - PCollection<KV<String, KV<String, Double>>>
                          -> GroupByKey Transformation
                             -> pCollectionGroupByKey - PCollection<KV<String, Iterable<KV<String, Double>>>>
                                -> CalculateTransformationCustom
                                   -> pCollectionToFirestore - PCollection<ModelResultsCalculated>

step write firestore (works without errors in logs)

pCollectionToFirestore.apply("Write Firestore", ParDo.of(new FirestoreWriteDoFn<>(options)));

step write in logs for debug (works)

pCollectionToFirestore.apply(
        "Logs",
        ParDo.of(
                new DoFn<ModelResultsCalculated, Void>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        ModelResultsCalculated result = c.element();
                        LOG.info("{}", result.toString());
                    }
                }
        ) );

and

step write to BigQuery (works but it shows error in log)

previously use .skipInvalidRows() but that error it was shown in logs

WriteResult writeResult = pCollectionToFirestore.apply("Write History BQ",
        BigQueryIO.<ModelResultsCalculated>write()
                .to(options.getBigqueryTableHistory())
                .withFormatFunction((ModelResultsCalculated result) -> Utils.createTableRow(result))
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withExtendedErrorInfo()
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

So when I look at the data on my table of BQ, all good. but when i look logs errors in dataflow I can see the following

     java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/<dataset_name>/tables/<table_name>/insertAll?prettyPrint=false
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "No rows present in the request.",
    "reason" : "invalid"
  } ],
  "message" : "No rows present in the request.",
  "status" : "INVALID_ARGUMENT"
} 

the error is constantly generating in log every second. how could i handle that error, thanks in advance for sharing your experience



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source