'Dynamic kafka topic name in nestjs microservice

In Nestjs I am using kafka as message broker and set topic name in like this:

@MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
  // my code goes here
}

Is there any way to read kafka topic name from config service module?



Solution 1:[1]

I handle this by creating a new custom decorator.

export function KafkaTopic(variable: string | keyof AppConfig): any {
  return (
    target: any,
    key: string | symbol,
    descriptor: PropertyDescriptor,
  ) => {
    Reflect.defineMetadata(
      KAFKA_TOPIC_METADATA,
      variable,
      descriptor.value,
    );
    return descriptor;
  };

and then dynamically replace it by MessagePattern and set the topic name from appConfig:

@Injectable()
export class KafkaDecoratorProcessorService {
  constructor(
    private readonly LOG: Logger,
    private readonly appConfig: AppConfig,
  ) {
  }

  processKafkaDecorators(types: any[]) {
    for (const type of types) {
      const propNames = Object.getOwnPropertyNames(type.prototype);
      for (const prop of propNames) {
        const propValue = Reflect.getMetadata(
          KAFKA_TOPIC_METADATA,
          Reflect.get(type.prototype, prop),
        );

        if (propValue) {
          const topic = this.appConfig[propValue];
          this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
          Reflect.decorate(
            [MessagePattern(topic)],
            type.prototype,
            prop,
            Reflect.getOwnPropertyDescriptor(type.prototype, prop),
          );
        }
      }
    }
  }
}

This is how to run processKafkaDecorators in main.ts file:

const app = await NestFactory.create(AppModule);
  app
    .get(KafkaDecoratorProcessorService)
    .processKafkaDecorators([AppController]);

  app.connectMicroservice({
    transport: Transport.KAFKA,
    ...
   })

note that you have to it run before connecting microservice. And use it like this:

@KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
  async processMessage(
    @Payload() { value: payload }: { value: BookUpdateModel },
  ) {
    ...
  }

Source

Solution 2:[2]

You can use process.env.VAR_NAME, like this:

@MessagePattern(process.env.MESSAGES_TOPIC)

One important thing to notice is that .env file will not work, you'll need to set a environment variable prior to application start, that is because the ConfigService/dotenv loads too late for this case. That can be achieved with this in you packag.json:

"scripts": {
  "start": "export MESSAGES_TOPIC=topic_name || SET \"MESSAGES_TOPIC=topic_name \" && nest start",
},

Although Ali's answer may work, I think its too much for such a simple thing, It seems to me that is preferable do not use ConfigService for this specific case.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Fábio Magagnin