'Spring cloud data flow custom stream auto kafka binding
Actually I'm doing my own streams in spring cloud data flow, developing my own processors and sinks. But I'm a bit stucked with the internal kafka bindings. I will expose an example:
I make this stream:
The code of the custom proccesor seems like this:
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
private CoordDispatcher dispatcher= new CoordDispatcher () ;
@Bean
public Function<String, List<Message>> split() {
System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
return string -> {
....
The code and the stream works properly, but I'm a bit disgusting with these System.setProperty (Similar to write them in properties file). They are necesary to bind the kafka topics.
I remember that the older version used annotations (like @Input and @Output ) or something to bind the topics directly.
Any idea to do it in the newest version ?
Thanks a lot.
Versions:
- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3
Edit: I will add some info about the older version
The code was something like this
@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
@Splitter(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public List<Message> split(String in) throws ParseException, IOException {
String uuid=".....";
Long date = new Date().getTime();
...........
- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2
In this version, I not need any property to bind the kafka topics. I think that this binding was done for the annotations @EnableBinding and @Splitter , but this annotations not exist in the new Spring boot version.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

