'Streaming pubsub -bigtable using apache beam dataflow java
Trying to update the pubsub json message to bigtable .I am running code from local machine .the dataflow job is getting created .but i dont see any data updated in bigtable instance and also it does not throw any error in console or dataflow job.I also tried to have hardcode value and try to update in bigtable but still it didnt work. Please can anyone suggest or guide me in this issue
try{
PipelineOptions options = PipelineOptionsFactory.fromArgs(projectArgs).create();
options.setRunner(DataflowRunner.class);
System.out.println("tempfile-" + options.getTempLocation());
Pipeline p = Pipeline.create(options);
System.out.println("options" + options.getTempLocation());
p.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(PUBSUB_SUBSCRIPTION))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(ParDo.of(new RowGenerator())).apply(CloudBigtableIO.writeToTable(bigtableConfig));
p.run();
}catch (Exception e) {
// TODO: handle exception
System.out.println(e);
}
}
@ProcessElement public void processElement(ProcessContext context) {
try {
System.out.println("In for RowGenerator");
String decodedMessageAsJsonString = context.element();
System.out.println("decodedMessageAsJsonString"+decodedMessageAsJsonString);
String rowKey = String.valueOf(
LocalDateTime.ofInstant(Instant.now(), ZoneId.of("UTC"))
.toEpochSecond(ZoneOffset.UTC));
System.out.println("rowKey"+rowKey);
Put put = new Put(rowKey.getBytes());
put.addColumn("VALUE".getBytes(), "VALUE".getBytes(), decodedMessageAsJsonString.getBytes());
// put.addColumn(Bytes.toBytes("IBS"), Bytes.toBytes("name"),Bytes.toBytes("ram"));
context.output(put);
}catch (Throwable e) {
// TODO: handle exception
System.out.println(e);
}
Solution 1:[1]
I don't see any issue with the Bigtable side of the template. Just make sure that the column family (which I am assuming is "VALUE" exists on the destination table.
Are you sure that you are reading the right PubSub subscription and there are messages being sent to PubSub. If its all correct, it seems there is some issue in the PubSub configuration. Maybe add the PubSub tag on the question and someone from the pubsub community can help.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Verma |
