'Connecting two NestJS gRPC microservices on localhost

Introduction

I am quite new to NestJS, but really like how powerful it is so far, so I wanted to see how far I could get with using Kafka, gRPC and NestJS together.

My end goal is to have the following design, since I want to replicate my Kafka Producers:

gRPC client <---> gRPC server and Kafka client <----> Kafka

Kafka

I have a Kafka cluster which I have built in Kubernetes and which I reach via an L4 Load Balancer from NestJS with ease.

The Kafka side in NestJS is fine. I even rely on kafkajs and simply build out Consumers and Prodcuers using the kafkajs.Producer and kafkajs.Consumer classes accordingly, defining the configuration for the Kafka instance with a ConfigService.

gRPC

It is the gRPC side which I am struggling with as I am trying to let a gRPC server forward a request to a Kafka producer on behalf of the gRPC client.

Problem

I can start my gRPC server, but not my gRPC client. The client returns the following error from @grpc/grpc-js/src/server.ts:569:

[Nest] 50525  - 11/03/2022, 16:09:13     LOG [NestFactory] Starting Nest application...
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] AppModule dependencies initialized +17ms
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] ClientsModule dependencies initialized +0ms
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] GrpcClientModule dependencies initialized +0ms
E No address added out of total 2 resolved

/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569
              deferredCallback(new Error(errorString), 0);
                               ^
Error: No address added out of total 2 resolved
    at bindResultPromise.then.errorString (/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569:32)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

I am not sure why this happens, but I am suspicious that it has to do with the gRPC channel, or in other words communication link between the server and the client. However, I cannot find any documentation on this matter. Help would be greatly appreciated.

If you follow the stack trace you should notice the following in @grpc/grpc-js/src/server.ts:569:32, in other words that this error is in the @grpc/grpcjs library:

const resolverListener: ResolverListener = {
      onSuccessfulResolution: (
        addressList,
        serviceConfig,
        serviceConfigError
      ) => {
        // We only want one resolution result. Discard all future results 
        resolverListener.onSuccessfulResolution = () => {};
        if (addressList.length === 0) {
          deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
          return;
        }
        let bindResultPromise: Promise<BindResult>;
        if (isTcpSubchannelAddress(addressList[0])) {
          if (addressList[0].port === 0) {
            bindResultPromise = bindWildcardPort(addressList);
          } else {
            bindResultPromise = bindSpecificPort(
              addressList,
              addressList[0].port,
              0
            );
          }
        } else {
          // Use an arbitrary non-zero port for non-TCP addresses
          bindResultPromise = bindSpecificPort(addressList, 1, 0);
        }
        bindResultPromise.then(
          (bindResult) => {
            if (bindResult.count === 0) {
              const errorString = `No address added out of total ${addressList.length} resolved`;
              logging.log(LogVerbosity.ERROR, errorString);
              deferredCallback(new Error(errorString), 0);
            } else {
              if (bindResult.count < addressList.length) {
                logging.log(
                  LogVerbosity.INFO,
                  `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
                );
              }
              deferredCallback(null, bindResult.port);
            }
          },
          (error) => {
            const errorString = `No address added out of total ${addressList.length} resolved`;
            logging.log(LogVerbosity.ERROR, errorString);
            deferredCallback(new Error(errorString), 0);
          }
        );
      },
      onError: (error) => {
        deferredCallback(new Error(error.details), 0);
      },
    };

My code

Because gRPC requires a shared .proto file, I use the following:

syntax = "proto3";

package KAFKA_GRPC_SERVICE;

service KafkaGrpcService {
    rpc Produce(ProducerRequest) returns(Empty) {}
}

// See `kafkajs/types/ProducerRecord`
message ProducerRequest {
    
    // See `kafkajs/types/Message`
    message Message {
        required string value = 1;
        optional string key = 2;
        optional int32 partition = 3;
        optional string timestamp = 4;
    }

    required string topic = 1;
    repeated Message messages = 2;
    optional int32 acks = 3;
    optional int32 timeout = 4;
}

message Empty {}

I then autogenerate the interface required which gives me:

/**
* This file is auto-generated by nestjs-proto-gen-ts
*/

import { Observable } from 'rxjs';
import { Metadata } from '@grpc/grpc-js';

export namespace KAFKA_GRPC_SERVICE {
    export interface KafkaGrpcService {
        produce(data: ProducerRequest, metadata?: Metadata): Observable<Empty>;
    }
    // See &#x60;kafkajs/types/ProducerRecord&#x60;
    export interface ProducerRequest {
        topic: string;
        messages: ProducerRequest.Message[];
        acks?: number;
        timeout?: number;
    }
    export namespace ProducerRequest {
        // See &#x60;kafkajs/types/ProducerRecord&#x60;
        // See &#x60;kafkajs/types/Message&#x60;
        export interface Message {
            value: string;
            key?: string;
            partition?: number;
            timestamp?: string;
        }
    }
    // tslint:disable-next-line:no-empty-interface
    export interface Empty {
    }
}

Note I have tweaked some of the interface since it provides certain elements as optional, when I need a few of them to be required to be compatible with kafkajs/ProducerRecord.

My repository has two different NestJS apps, one called kafka-grpc-server and another called kafka-grpc-client respectively, so the code is slightly different. I won't post the ProducerService here for sake of brevity.

Server

Myconfig for the gRPC service is defined in a grpc.options.ts file that looks as follows:

import { Transport } from "@nestjs/microservices";
import { join } from "path";

export const grpcOptions = {
    transport: Transport.GRPC,
    options: {
        package: 'KAFKA_GRPC_SERVICE',
        url: 'localhost:5000',
        protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
    },
}

My Controller on the Server side looks like this:

import { Controller,Logger } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { KAFKA_GRPC_SERVICE } from './grpc/interfaces/kafkagrpcservice';
import { ProducerService } from "./kafka/producer/producer.service";

@Controller()
export class KafkaGrpcController implements KAFKA_GRPC_SERVICE.KafkaGrpcService {
    private logger = new Logger(KafkaGrpcController.name)
    constructor(private readonly kafkaProducer: ProducerService) {}

    @GrpcMethod('KafkaGrpcService', 'Produce')
    produce(data: KAFKA_GRPC_SERVICE.ProducerRequest, metadata: any): Observable<any> {
        this.logger.log('Producing message: {} with metadata: {}', data, metadata.toString());
        this.kafkaProducer.produce(data);
        return;
    }
}

and my main.ts server side is like this:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';

async function bootstrap() {
  const logger = new Logger('Main')
  const app = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
  await app.listen();
  logger.log(`Microservice is listening on '${grpcOptions.options.url}'`);

}
bootstrap();

I can successfully start the server without any issues, although I do notice that there is one process with two file descriptors listening on :5000, with TCP. I am not sure whether this is standard. First question I have is why are there two services, and not one when I just create one microservice? Is this a NestJS thing?

node      50355 mattia   27u  IPv6 *      0t0  TCP localhost:commplex-main (LISTEN)
node      50355 mattia   29u  IPv4 *      0t0  TCP localhost:commplex-main (LISTEN)

Client

Client side is slightly different. I use the same .proto file. However, the grpc.options.ts file is slightly different since this needs to be a client:

export const grpcOptions: ClientOptions = {
    transport: Transport.GRPC,
    options: {
        package: 'KAFKA_GRPC_SERVICE',
        url: 'localhost:5000',
        protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
    },
}

As you can see, ClientOptions are used for the client, but not for the server.

Client side, I have a GrpcClientModule which looks like this:

import { Module } from "@nestjs/common";
import { ClientsModule } from "@nestjs/microservices";
import { grpcOptions } from "../grpc.options";
import { GrpcClientController } from "./grpcclient.controller";

@Module({
    imports: [
        ClientsModule.register([
            {
                name: 'KAFKA_GRPC_SERVICE',
                ...grpcOptions,
            }
        ])
    ],
    controllers: [
        GrpcClientController,
    ],
})
export class GrpcClientModule {}

and the GrpClientController is like this:

import { Metadata } from "@grpc/grpc-js";
import { Body, Controller, Get, Inject, OnModuleInit, Post } from "@nestjs/common";
import { Client, ClientGrpc } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { grpcOptions } from "src/grpc.options";
import { KAFKA_GRPC_SERVICE } from "./interfaces/kafkagrpcservice";

@Controller()
export class GrpcClientController implements OnModuleInit {
    constructor(@Inject('KAFKA_GRPC_SERVICE') private client: ClientGrpc) {}
    private grpcService: KAFKA_GRPC_SERVICE.KafkaGrpcService;

    onModuleInit(): void {
        this.grpcService = this.client.getService<KAFKA_GRPC_SERVICE.KafkaGrpcService>('KafkaRpcService')
    }

    @Post('produce')
    async produce(data: KAFKA_GRPC_SERVICE.ProducerRequest): Promise<Observable<KAFKA_GRPC_SERVICE.ProducerResponse>> {
        const metadata = new Metadata();
        metadata.add('Set-Cookie', 'my_cookie=in_milk');
        return this.grpcService.produce( { topic: data.topic, messages: data.messages }, metadata );
    }
}

I start my client as follows:

import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, grpcOptions);
  await app.listen();
}
bootstrap();

Any help would be great!



Solution 1:[1]

So, I found the answer. It seems that it is impossible to start a single microservice as a gRPC server. Indeed, whenever I turn the microservice into a hybrid application as follows:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';

async function bootstrap() {
  const logger = new Logger('Main')
  const app = await NestFactory.create(KafkaGrpcServerModule);
  const grpcServer = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
  await app.startAllMicroservices();
  await app.listen(3000);

}
bootstrap();

I am able to connect the client.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Mattia Bradascio