'Spring cloud data flow custom stream auto kafka binding

Actually I'm doing my own streams in spring cloud data flow, developing my own processors and sinks. But I'm a bit stucked with the internal kafka bindings. I will expose an example:

I make this stream:

enter image description here

The code of the custom proccesor seems like this:

@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    private CoordDispatcher dispatcher= new CoordDispatcher () ;

    @Bean
    public Function<String, List<Message>> split() {
        System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
        System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
        System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
        System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
        return string -> {

....

The code and the stream works properly, but I'm a bit disgusting with these System.setProperty (Similar to write them in properties file). They are necesary to bind the kafka topics.

I remember that the older version used annotations (like @Input and @Output ) or something to bind the topics directly.

Any idea to do it in the newest version ?

Thanks a lot.

Versions:

- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3

Edit: I will add some info about the older version

The code was something like this

@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    @Splitter(inputChannel = Processor.INPUT, 
              outputChannel = Processor.OUTPUT)
    public List<Message> split(String in) throws ParseException, IOException {

        String uuid=".....";
        Long date = new Date().getTime();
        ...........


- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2

In this version, I not need any property to bind the kafka topics. I think that this binding was done for the annotations @EnableBinding and @Splitter , but this annotations not exist in the new Spring boot version.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source