'A hot key <hot-key-name> was detected in our Dataflow pipeline
We have been facing a hot key issue in our Dataflow pipeline (streaming pipeline, batch load into BigQuery -- we are using batch for a cost-effective purpose):
We are ingesting data to according tables based on their decoder value. For example, data with http decoder are going to http table, data with ssl decoder are going to ssl table.
So the BigQuery ingestion is using dynamic destinations.
The key is the destination table spec for the data.
An example error log:
A hot key
'key: tableSpec: ace-prod-300923:ml_dataset_us_central1.ssl tableDescription: Table for ssl shard: 1'
was detected in step
'insertTableRowsToBigQuery/BatchLoads/SinglePartitionsReshuffle/GroupByKey/ReadStream' with age of '1116.266s'.
This is a symptom of key distribution being skewed.
To fix, please inspect your data and pipeline to ensure that elements are evenly distributed across your key space.
Error is detected in this step: 'insertTableRowsToBigQuery/BatchLoads/SinglePartitionsReshuffle/GroupByKey/ReadStream'
The hot key issue is because of the nature of data, some decoder data has disproportionately many values. And our pipeline is a streaming pipeline.
We have read the document provided by Google but still not sure how to fix it.
Dataflow shuffle. Our project is already using streaming engine
Rekey. Doesn't seem to apply to our case, as the key is the destination table spec. To make the ingestion work, the key has to match the existing table spec in bigquery.
Combine.PerKey.withHotKeyFanout(). I don't know how to apply this. Because the key is generated in this step:insertTableRowsToBigQuery. This step, we are using BigQueryIO to write to BigQuery. The key is coming from dynamically generate BigQuery table names based on the current window or the current value. Sharding BigQuery output tables
Attached the code where the hot key is detected:
toBq.apply("insertTableRowsToBigQuery",
BigQueryIO
.<DataIntoBQ>write()
.to((ValueInSingleWindow<DataIntoBQ> dataIntoBQ) -> {
try {
String decoder = dataIntoBQ.getValue().getDecoder(); // getter function
String destination = String.format("%s:%s.%s",
PROJECT_ID, DATASET, decoder);
if (!listOfProtocols.contains(decoder)) {
LOG.error("wrong bigquery table decoder destination: " + decoder);
}
return new TableDestination(
destination, // Table spec
"Table for " + decoder // Table description
);
} catch (Exception e) {
LOG.error("insertTableRowsToBigQuery error", e);
return null;
}
}
)
.withFormatFunction(
(DataIntoBQ elem) ->
new DataIntoBQ().toTableRow(elem)
)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(3))
.withAutoSharding()
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(options.getGcpTempLocation()))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
