'consumer spring cloud stream with kafka not working
I implement saga choreography microservices with Java Spring Boot. I develop a ordering system which allow user yo buy a new product and validating the customer`s cash money available in his account. I have two projects, First one is order-service which accept the order data and customer through the Rest Api then create a new order in the database with order status = ORDER_CREATED then call publisher to publish a message in kafka server and I can see the event in the order-event Kafka.
My problem the payment-service could not consume the order-event from Kafka.
what is the problem in the configuration class ?
Application yaml
spring:
cloud:
stream:
function:
definition : paymentProcessor
bindings:
paymentProcessor-in-0 :
destination: order-event
paymentProcessor-out-0 :
destination: payment-event
server:
port: 9090
Configuration
package com.woody.saga.choreography.payment.config;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.OrderStatus;
import com.woody.saga.choreography.pattern.event.PaymentEvent;
import com.woody.saga.choreography.payment.service.PaymentService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Configuration
public class PaymentConsumerConfig {
private PaymentService paymentService;
public PaymentService getPaymentService() {
return paymentService;
}
@Autowired
public void setPaymentService(PaymentService paymentService) {
this.paymentService = paymentService;
}
@Bean
public Function<Flux<OrderEvent>, Flux<PaymentEvent>> paymentProcessor() {
return orderEventFlux -> orderEventFlux.flatMap(this::processPayment);
}
private Mono<PaymentEvent> processPayment(OrderEvent orderEvent) {
if (OrderStatus.ORDER_CREATED.equals(orderEvent.getOrderStatus())) {
return Mono.fromSupplier(() -> this.getPaymentService().newOrderEvent(orderEvent));
}
return Mono.fromSupplier(() -> this.getPaymentService().cancelOrderEvent(orderEvent));
}
}
Service
package com.woody.saga.choreography.payment.service;
import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.PaymentEvent;
public interface PaymentService {
PaymentEvent newOrderEvent(OrderEvent orderEvent);
PaymentEvent cancelOrderEvent(OrderEvent orderEvent);
}
package com.woody.saga.choreography.payment.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.woody.saga.choreography.pattern.dto.OrderRequestDTO;
import com.woody.saga.choreography.pattern.dto.PaymentRequestDTO;
import com.woody.saga.choreography.pattern.event.OrderEvent;
import com.woody.saga.choreography.pattern.event.PaymentEvent;
import com.woody.saga.choreography.pattern.event.PaymentStatus;
import com.woody.saga.choreography.payment.entity.UserTransaction;
import com.woody.saga.choreography.payment.repository.UserRepository;
import com.woody.saga.choreography.payment.repository.UserTransactionRepository;
@Service
public class PaymentServiceImpl implements PaymentService {
private UserRepository userRepository;
private UserTransactionRepository userTransactionRepository;
public UserRepository getUserRepository() {
return userRepository;
}
@Autowired
public void setUserRepository(UserRepository userRepository) {
this.userRepository = userRepository;
}
public UserTransactionRepository getUserTransactionRepository() {
return userTransactionRepository;
}
@Autowired
public void setUserTransactionRepository(UserTransactionRepository userTransactionRepository) {
this.userTransactionRepository = userTransactionRepository;
}
/*
* get the user ID check the balance availability if the balance sufficient,
* then Payment is completed and deduct amount from DB if not sufficient, then
* cancel the order
*/
@Override
public PaymentEvent newOrderEvent(OrderEvent orderEvent) {
OrderRequestDTO orderRqDto = orderEvent.getOrderRequestDto();
Integer userId = orderRqDto.getUserId();
PaymentRequestDTO paymentRqDto = new PaymentRequestDTO(orderRqDto.getOrderId(), userId, orderRqDto.getAmount());
return getUserRepository().findById(userId)
.filter(user -> user.getCashMoney() >= orderRqDto.getAmount())
.map(user -> {
user.setCashMoney(user.getCashMoney() - orderRqDto.getAmount());
UserTransaction userTransaction = new UserTransaction(orderRqDto.getOrderId(), userId,
orderRqDto.getAmount());
getUserTransactionRepository().save(userTransaction);
return new PaymentEvent(paymentRqDto, PaymentStatus.PAYMENT_COMPLETED);
}).orElse(new PaymentEvent(paymentRqDto, PaymentStatus.PAYMENT_FAILED));
}
@Override
public PaymentEvent cancelOrderEvent(OrderEvent orderEvent) {
// TODO Auto-generated method stub
return null;
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
