'consumer spring cloud stream with kafka not working

I implement saga choreography microservices with Java Spring Boot. I develop a ordering system which allow user yo buy a new product and validating the customer`s cash money available in his account. I have two projects, First one is order-service which accept the order data and customer through the Rest Api then create a new order in the database with order status = ORDER_CREATED then call publisher to publish a message in kafka server and I can see the event in the order-event Kafka.

My problem the payment-service could not consume the order-event from Kafka.

what is the problem in the configuration class ?

Application yaml

spring:
  cloud:
    stream:
      function:
        definition : paymentProcessor
      bindings:
        paymentProcessor-in-0 :
          destination: order-event
        paymentProcessor-out-0 :
          destination: payment-event

server:
  port: 9090

Configuration

package com.woody.saga.choreography.payment.config;

import java.util.function.Function;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.OrderStatus;
import com.woody.saga.choreography.pattern.event.PaymentEvent;
import com.woody.saga.choreography.payment.service.PaymentService;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Configuration
public class PaymentConsumerConfig {

    private PaymentService paymentService;

    public PaymentService getPaymentService() {
        return paymentService;
    }

    @Autowired
    public void setPaymentService(PaymentService paymentService) {
        this.paymentService = paymentService;
    }

    @Bean
    public Function<Flux<OrderEvent>, Flux<PaymentEvent>> paymentProcessor() {
        return orderEventFlux -> orderEventFlux.flatMap(this::processPayment);
    }

    private Mono<PaymentEvent> processPayment(OrderEvent orderEvent) {
        if (OrderStatus.ORDER_CREATED.equals(orderEvent.getOrderStatus())) {
            return Mono.fromSupplier(() -> this.getPaymentService().newOrderEvent(orderEvent));
        }
        return Mono.fromSupplier(() -> this.getPaymentService().cancelOrderEvent(orderEvent));
    }
}

Service

package com.woody.saga.choreography.payment.service;

import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.PaymentEvent;

public interface PaymentService {

    PaymentEvent newOrderEvent(OrderEvent orderEvent);

    PaymentEvent cancelOrderEvent(OrderEvent orderEvent);

}
package com.woody.saga.choreography.payment.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.woody.saga.choreography.pattern.dto.OrderRequestDTO;
import com.woody.saga.choreography.pattern.dto.PaymentRequestDTO;
import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.PaymentEvent;
import com.woody.saga.choreography.pattern.event.PaymentStatus;
import com.woody.saga.choreography.payment.entity.UserTransaction;
import com.woody.saga.choreography.payment.repository.UserRepository;
import com.woody.saga.choreography.payment.repository.UserTransactionRepository;

@Service
public class PaymentServiceImpl implements PaymentService {

    private UserRepository userRepository;
    private UserTransactionRepository userTransactionRepository;

    public UserRepository getUserRepository() {
        return userRepository;
    }

    @Autowired
    public void setUserRepository(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public UserTransactionRepository getUserTransactionRepository() {
        return userTransactionRepository;
    }

    @Autowired
    public void setUserTransactionRepository(UserTransactionRepository userTransactionRepository) {
        this.userTransactionRepository = userTransactionRepository;
    }

    /*
     * get the user ID check the balance availability if the balance sufficient,
     * then Payment is completed and deduct amount from DB if not sufficient, then
     * cancel the order
     */

    @Override
    public PaymentEvent newOrderEvent(OrderEvent orderEvent) {
        OrderRequestDTO orderRqDto = orderEvent.getOrderRequestDto();
        Integer userId = orderRqDto.getUserId();
        PaymentRequestDTO paymentRqDto = new PaymentRequestDTO(orderRqDto.getOrderId(), userId, orderRqDto.getAmount());
        return getUserRepository().findById(userId)
        .filter(user -> user.getCashMoney() >= orderRqDto.getAmount())
        .map(user -> {
            
            user.setCashMoney(user.getCashMoney() - orderRqDto.getAmount());
            UserTransaction userTransaction = new UserTransaction(orderRqDto.getOrderId(), userId,
                    orderRqDto.getAmount());
            getUserTransactionRepository().save(userTransaction);
            return new PaymentEvent(paymentRqDto, PaymentStatus.PAYMENT_COMPLETED);
        }).orElse(new PaymentEvent(paymentRqDto, PaymentStatus.PAYMENT_FAILED));
    }

    @Override
    public PaymentEvent cancelOrderEvent(OrderEvent orderEvent) {
        // TODO Auto-generated method stub
        return null;
    }

}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source