'How to implement KStream-Ktable leftJoin using multiple key approache (pk-fk and fk-fk join) in Kafka-streams?

I'm working joining a stream and tables in kafka by using left-join approach, in that I am getting values for some joins because in this case join is implemented as foreign-key to Primary-key, which is working case.

Below is the implemented code for join. (foreignKey-primaryKey) 

In this case id is primary key for PassengerTable

 .selectKey((id, stop) -> {
        if (stop.getPassenger() != null && stop.getPassenger().getId() != null) {
            return stop.getPassenger().getId();
        }
        return 0L;
    })
    .leftJoin(table, (stop, passenger) -> {
        topic.Value scA3 = null;
        if (passenger!= null) {
            scA3 = topic.Value
                .newBuilder()
                .setId(passenger.getId())
                .build();
        }

        stop.setPassenger(scA3);
        return stop;
    })

And for some scenarios join is not implemented and getting null value for the object because that join is implented as primary-key to foreign-key.

Below is implemented code for pk-fk join.

In this case PassengerId is foreign-key of payment table and join is implementing between passenger table id (Pk) and paymentTable passengerId(fk).(join failed)

.selectKey((id, stop) -> {
            if (stop.getPayment() != null && stop.getPayment().getPassengerId() != null) {             
 return stop.getPayment().getPassengerId()
            }
            return 0L;
        })
        .leftJoin(paymentTable, (stop, payment) -> {
         logger.info("object  in left join payment : " + payment); // getting null value of payment obj because by default it is implenting join with id of payment table instead of passengerId this join is failed to implement.
            topic.Value paymentA3 = null;
            if (payment != null) {
                PaymentA3 = topic.Value
                    .newBuilder()
                    .getPassengerId(payment.getPassengerId())
                    .setId(payment.getId())
                    .build();
            }
            stop.setPayment(PaymentA3);
            return stop;
})

I have three scenarios:-
1) fk-pk join --> working
2) pk-fk join --> not working in my case
3) fk-fk join --> not working in my case

getting null values of object in 2nd and 3rd scenarios and join is not implementing in these cases.

How can I point a particular foreign key for implementing these joins (pk-fk and fk-fk) ? How to bind key for any join(leftKey or rightKey) for implementing joins in kafka?

So my question is that how can I implement join for these scenarios (pk-fk and fk-fk) ?

If anybody has idea on these type of joins please let me know, how can I proceed further for implementing this join?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source