'When consuming an avro schema object from kafka in a springboot application and then storing the message in a database do I need a mapping class?
I have a springboot application that will be listening to a Kafka topic which is passing messages that conform to an avro schema. The springboot app will be responsible for taking these messages from kafka and storing them in a postgres database. The fields on the kafka message map almost 1 to 1 to the database columns.
The way it's coding right now we have a mapper class that will deserialized avro object and map that to a POJO and then use that POJO to map to the db. This all feels very manual the way it's written right now and I guess I'm just wondering if it's actually needed or if there is some Spring/Avro auto-magical way of doing this? This is the code in our mapper class to give you some sense of what I'm referring to:
public class AppointmentMapper {
final static String NO_DESCRIPTION="";
final static String NO_FACILITY="";
public static AppointmentDTO mapToApi(Appointment appointment){
return AppointmentDTO.builder()
.status(appointment.getStatus())
.consumerId(appointment.getConsumerId())
.providerId(appointment.getProviderId())
.startDate(appointment.getStartDate())
.endDate(appointment.getEndDate())
.duration(appointment.getDuration())
.businessId(appointment.getBusinessId())
.appointmentId(appointment.getAppointmentId())
.facilityId(appointment.getFacilityId())
.description(appointment.getDescription())
.officeId(appointment.getOfficeId())
.pmsId(appointment.getPmsId())
.build();
}
public static Appointment mapToDb(com.kafka.avro.model.appointment.Appointment avroAppointment) {
Appointment appointment = new Appointment();
appointment.setAppointmentId(avroAppointment.getBase().getGuid());
appointment.setConsumerId(avroAppointment.getConsumerGuid().orElse(null));
if(avroAppointment.getStatus().isPresent()){
AppointmentStatus status = AppointmentStatus.valueOf(avroAppointment.getStatus().get().name());
appointment.setStatus(status);
}else{
appointment.setStatus(AppointmentStatus.UNSPECIFIED);
}
appointment.setPmsId(avroAppointment.getBase().getPmsId());
appointment.setDescription(avroAppointment.getDescription().orElse(NO_DESCRIPTION));
appointment.setOfficeId(avroAppointment.getOfficeId().orElse(null));
appointment.setProviderId(avroAppointment.getProvider().orElse(null));
appointment.setBusinessId(avroAppointment.getBase().getBusinessGuid());
avroAppointment.getAppointmentCodes().ifPresent(appointmentCodes -> appointmentCodes.forEach(appointmentCode -> appointment.getAppointmentProcedures().add(mapToProcedure(appointmentCode))));
avroAppointment.getDate().ifPresent(instant ->{
ZonedDateTime startDate = ZonedDateTime.ofInstant(instant, ZoneId.of("UTC"));
ZonedDateTime endDate = null;
int duration = avroAppointment.getDuration().orElse(0);
appointment.setDuration(duration);
appointment.setStartDate(startDate);
if(duration>0){
endDate = startDate.plus(duration, TimeUnit.HOURS.toChronoUnit());
}
appointment.setEndDate(endDate);
});
appointment.setFacilityId(avroAppointment.getFacilityId().orElse(NO_FACILITY));
return appointment;
}
public static Appointment mapToDb(AppointmentDTO appointmentDTO) {
Appointment appointment = new Appointment();
appointment.setAppointmentId(appointmentDTO.getAppointmentId());
appointment.setEndDate(appointmentDTO.getEndDate());
appointment.setStartDate(appointmentDTO.getStartDate());
appointment.setPmsId(appointmentDTO.getPmsId());
appointment.setFacilityId(appointmentDTO.getFacilityId());
appointment.setBusinessId(appointmentDTO.getBusinessId());
appointment.setConsumerId(appointmentDTO.getConsumerId());
appointment.setDuration(appointmentDTO.getDuration());
appointment.setOfficeId(appointmentDTO.getOfficeId());
appointment.setStatus(appointmentDTO.getStatus());
appointment.setProviderId(appointmentDTO.getProviderId());
return appointment;
}
}
Solution 1:[1]
some Spring/Avro auto-magical way of doing this?
Short answer - Yes, but only if you use the Kafka Connect JDBC Sink instead with a Schema Registry. This utilizes Connect API's internal Schema/Struct models to create database DDL statements. It'll work for more than just Avro, too.
If your data in the topic doesn't map exactly to the format that you want in the database, you can still use Spring-Kafka / Spring-Cloud Streams to consume & map the data to an output topic that the Connect Sink will eventually write. Or, you can use built-in / custom transforms in the Connect API.
Otherwise, if you insist on using Spring and your own code, you need to (re)build that ORM layer yourself (as well as any database client handling, batching, error-processing, etc.). ModelMapper is a useful Java library, I've found for converting your event models into domain/persistence models.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
