'Unable to Provide the expected data from an ItemReader to a KafkaItemWriter

I have a list of users with accounts like this with a goal of collecting all account activity of the users and activating special features if the accounts meet certain criteria. Here is the User class basic structure:

           class User  {
               String userId;
                String name;
            ... //Other details about the user
                List<Account> accounts;
            }

I wrote an item reader which uses a ResultSetExtractor in a UserDao to return the users with their associated accounts. Each row of the query returns user details and the associated account row by row. However, I am stuck on how to pass the user data to the KafkaItemWriter. The KafkaItemWriter expects the String userId as its key and the User as its value like this in the next writer following UserAccountReader.

     KafkaItemWriter<String, User> etc.

I do not know how to include the KafkaItemWriter in the next step. Here is what the current step looks like with the basics of the ItemReader shown and the step following:

       //This is the reader
        class UserAccountReader implements ItemReader{
           // Associates the user with their accounts 
           private UserDao userDao;
           
           public List<User> read(){
              // Here the users have the right accounts
              return userDao.getUsersWithAccounts(); 
            }
        }
        
        //This is the step portion of the batch which does not work
        public Step userAccountProcessingStep() throws Exception {
            return stepBuilderFactory.get("accountStep")
               .<User, User>chunk(50)
               .reader(userAccountReader)) // returns the list of Users with accounts
               //What can I DO HERE... 
               .writer(kafkaItemWriter()) // Only accepts <String, User>
               .build();
         }

The issue is that I need to collect the users with their accounts, so a RowMapper is not a suitable choice. The accounts are special accounts which may be about 250 at most so I want to be able to process these all together.

Is there a way to insert something that would make it compatible to the kafkaItemwriter?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source