'Substitute ints into Dataflow via Cloudbuild yaml

I've got a streaming Dataflow pipeline, written in Java with BEAM 2.35. It commits data to BigQuery via StorageWriteApi. Initially the code looks like

BigQueryIO.writeTableRows()
  .withTimePartitioning(/* some column */)
  .withClustering(/* another column */)
  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
  .withTriggeringFrequency(Duration.standardSeconds(30))
  .withNumStorageWriteApiStreams(20) // want to make this dynamic

This code runs in different environment eg Dev & Prod. When I deploy in Dev I want 2 StorageWriteApiStreams, in Prod I want 20, and I'm trying to pass/resolve these values at the moment I deploy with a Cloudbuild.

The cloudbuild-dev.yaml looks like

steps:
  - lots-of-steps
args:
 --numStorageWriteApiStreams=${_NUM_STORAGEWRITEAPI_STREAMS}
substitutions:
  _PROJECT: dev-project
  _NUM_STORAGEWRITEAPI_STREAMS: '2'

I expose the substitution in the job code with an interface

  ValueProvider<String> getNumStorageWriteApiStreams();
  void setNumStorageWriteApiStreams(ValueProvider<String> numStorageWriteApiStreams);

I then refactor the writeTableRows() call to invoke getNumStorageWriteApiStreams()

BigQueryIO.writeTableRows()
  .withTimePartitioning(/* some column */)
  .withClustering(/* another column */)
  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
  .withTriggeringFrequency(Duration.standardSeconds(30))
  .withNumStorageWriteApiStreams(Integer.parseInt(String.valueOf(options.getNumStorageWriteApiStreams())))

Now it's dynamic but I get a build failure on account of java.lang.IllegalArgumentException: methods with same signature getNumStorageWriteApiStreams() but incompatible return types: [class java.lang.Integer, interface org.apache.beam.sdk.options.ValueProvider]

My understanding was that Integer.parseInt returns an int, which I want so I can pass it to withNumStorageWriteApiStreams() which requires an int.

I'd appreciate any help I can get here thanks



Solution 1:[1]

Turns out BigQueryOptions.java already has a method getNumStorageWriteApiStreams() that returns an Integer. I was unknowingly trying to rewrite it with a different return, oops.

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java#L95-L98

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 onji