'How to handle RuntimeException: GoogleJsonResponseException: 400 Bad Request error in Dataflow streaming writing data in BigQuery
I builded job streaming in dataflow with java, actually my datasource is a table in bigtable and i save data in windows fixed (apache beam) every 3 minutes and execute four transformatios. finally i'm writing in three destination, in the third destination the error occurs constantly Currently I do not know the origin of the error, I even put .skipInvalidRows() when write in bigquery but error continue.
below i show steps of my pipeline
step read every 3 minutes
PCollection<List<ModelTableOfBigTable>> pCollectionTableOfBigTable = p
.apply("", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(options.getWindowPeriod())))
.apply(Window.<Long>into(FixedWindows.of(Duration.standardMinutes(options.getWindowPeriod()))))
.apply("Scan TableOfBigTable", ParDo.of(new ScanTableOfBigTableDoFn(bigtableTableConfig, options)));
step transformations
pCollectionTableOfBigTable - PCollection<List<ModelTableOfBigTable>>
-> FirstTransformationCustom
-> pCollectionToMean - PCollection<KV<String, Double>>
-> Mean Transformation
-> pCollectionMean - PCollection<KV<String, Double>>
-> SecondTransformationCustom
-> pCollectionToGroupByKey - PCollection<KV<String, KV<String, Double>>>
-> GroupByKey Transformation
-> pCollectionGroupByKey - PCollection<KV<String, Iterable<KV<String, Double>>>>
-> CalculateTransformationCustom
-> pCollectionToFirestore - PCollection<ModelResultsCalculated>
step write firestore (works without errors in logs)
pCollectionToFirestore.apply("Write Firestore", ParDo.of(new FirestoreWriteDoFn<>(options)));
step write in logs for debug (works)
pCollectionToFirestore.apply(
"Logs",
ParDo.of(
new DoFn<ModelResultsCalculated, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
ModelResultsCalculated result = c.element();
LOG.info("{}", result.toString());
}
}
) );
and
step write to BigQuery (works but it shows error in log)
previously use .skipInvalidRows() but that error it was shown in logs
WriteResult writeResult = pCollectionToFirestore.apply("Write History BQ",
BigQueryIO.<ModelResultsCalculated>write()
.to(options.getBigqueryTableHistory())
.withFormatFunction((ModelResultsCalculated result) -> Utils.createTableRow(result))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
So when I look at the data on my table of BQ, all good. but when i look logs errors in dataflow I can see the following
java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/<dataset_name>/tables/<table_name>/insertAll?prettyPrint=false
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "No rows present in the request.",
"reason" : "invalid"
} ],
"message" : "No rows present in the request.",
"status" : "INVALID_ARGUMENT"
}
the error is constantly generating in log every second. how could i handle that error, thanks in advance for sharing your experience
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
