'How to use multiple ClientKafka in one service?

I have a problem with using multiple ClientKafka in one service, here is my implementation:

@Controller()
export class ApiController implements OnModuleInit {
  constructor(
    @Inject("ACCOUNT_SERVICE") private readonly accountKafkaClient: ClientKafka,
    @Inject("WORKSPACE_SERVICE") private readonly workspaceKafkaClient: ClientKafka
  ) { }

  async onModuleInit() {
    const requestPatterns = [
      'topic'
    ];
    requestPatterns.forEach((pattern) => {
      this.accountKafkaClient.subscribeToResponseOf(`account.${pattern}`);
    });
    await this.accountKafkaClient.connect();
  }

  async onModuleDestroy() {
    await this.accountKafkaClient.close();
  }

  @Get()
  async sendMessage() {
    const data = {
      msg: "account.topic"
    }

    const kafkaResponse = this.accountKafkaClient.send<any>('account.topic', JSON.stringify(data));
    const response = await firstValueFrom(kafkaResponse);

    const kafkaResponse2 = this.workspaceKafkaClient.send<any>('workspace.topic', JSON.stringify(response )) //THIS IS NOT RUNNING, WORKSPACE_SERVICE NOT RECEIVE ANY MESSAGE

    return await firstValueFrom(kafkaResponse2);
  }
}

can someone tell me why workspaceKafkaClient is not sending any message to WORKSPACE_SERVICE microservice? I try with passing this client in onModule... functions like accountKafkaClient but it didn't help me,

here is also my settings in module:

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'ACCOUNT_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'account_service',
            brokers: ['localhost:29092'],
          },
          consumer: {
            groupId: 'account-consumer',
          },
        },
      },
      {
        name: 'WORKSPACE_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'workspace_service',
            brokers: ['localhost:29092'],
          },
          consumer: {
            groupId: 'workspace-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [ApiController],
  providers: [
    ApiService,
    // KafkaProducerProvider,
  ],
})
export class ApiModule {}

thanks for any help!



Solution 1:[1]

You only need one producer client per application, but Kafka producers never immediately send data to brokers.

You need to flush them for that to happen, which is what await firstValueFrom(...) should do, but you've not shown that method.

Otherwise, you seem to be trying to get the reponse from one topic to send to another, which is what a consumer should be used for, rather than blocking on one producer request.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1