'Connecting two NestJS gRPC microservices on localhost
Introduction
I am quite new to NestJS, but really like how powerful it is so far, so I wanted to see how far I could get with using Kafka, gRPC and NestJS together.
My end goal is to have the following design, since I want to replicate my Kafka Producers:
gRPC client <---> gRPC server and Kafka client <----> Kafka
Kafka
I have a Kafka cluster which I have built in Kubernetes and which I reach via an L4 Load Balancer from NestJS with ease.
The Kafka side in NestJS is fine. I even rely on kafkajs and simply build out Consumers and Prodcuers using the kafkajs.Producer and kafkajs.Consumer classes accordingly, defining the configuration for the Kafka instance with a ConfigService.
gRPC
It is the gRPC side which I am struggling with as I am trying to let a gRPC server forward a request to a Kafka producer on behalf of the gRPC client.
Problem
I can start my gRPC server, but not my gRPC client. The client returns the following error from @grpc/grpc-js/src/server.ts:569:
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [NestFactory] Starting Nest application...
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] AppModule dependencies initialized +17ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] ClientsModule dependencies initialized +0ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] GrpcClientModule dependencies initialized +0ms
E No address added out of total 2 resolved
/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569
deferredCallback(new Error(errorString), 0);
^
Error: No address added out of total 2 resolved
at bindResultPromise.then.errorString (/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569:32)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
I am not sure why this happens, but I am suspicious that it has to do with the gRPC channel, or in other words communication link between the server and the client. However, I cannot find any documentation on this matter. Help would be greatly appreciated.
If you follow the stack trace you should notice the following in @grpc/grpc-js/src/server.ts:569:32, in other words that this error is in the @grpc/grpcjs library:
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) {
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
return;
}
let bindResultPromise: Promise<BindResult>;
if (isTcpSubchannelAddress(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
}
} else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(
(bindResult) => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
deferredCallback(null, bindResult.port);
}
},
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
}
);
},
onError: (error) => {
deferredCallback(new Error(error.details), 0);
},
};
My code
Because gRPC requires a shared .proto file, I use the following:
syntax = "proto3";
package KAFKA_GRPC_SERVICE;
service KafkaGrpcService {
rpc Produce(ProducerRequest) returns(Empty) {}
}
// See `kafkajs/types/ProducerRecord`
message ProducerRequest {
// See `kafkajs/types/Message`
message Message {
required string value = 1;
optional string key = 2;
optional int32 partition = 3;
optional string timestamp = 4;
}
required string topic = 1;
repeated Message messages = 2;
optional int32 acks = 3;
optional int32 timeout = 4;
}
message Empty {}
I then autogenerate the interface required which gives me:
/**
* This file is auto-generated by nestjs-proto-gen-ts
*/
import { Observable } from 'rxjs';
import { Metadata } from '@grpc/grpc-js';
export namespace KAFKA_GRPC_SERVICE {
export interface KafkaGrpcService {
produce(data: ProducerRequest, metadata?: Metadata): Observable<Empty>;
}
// See `kafkajs/types/ProducerRecord`
export interface ProducerRequest {
topic: string;
messages: ProducerRequest.Message[];
acks?: number;
timeout?: number;
}
export namespace ProducerRequest {
// See `kafkajs/types/ProducerRecord`
// See `kafkajs/types/Message`
export interface Message {
value: string;
key?: string;
partition?: number;
timestamp?: string;
}
}
// tslint:disable-next-line:no-empty-interface
export interface Empty {
}
}
Note I have tweaked some of the interface since it provides certain elements as optional, when I need a few of them to be required to be compatible with kafkajs/ProducerRecord.
My repository has two different NestJS apps, one called kafka-grpc-server and another called kafka-grpc-client respectively, so the code is slightly different. I won't post the ProducerService here for sake of brevity.
Server
Myconfig for the gRPC service is defined in a grpc.options.ts file that looks as follows:
import { Transport } from "@nestjs/microservices";
import { join } from "path";
export const grpcOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
My Controller on the Server side looks like this:
import { Controller,Logger } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { KAFKA_GRPC_SERVICE } from './grpc/interfaces/kafkagrpcservice';
import { ProducerService } from "./kafka/producer/producer.service";
@Controller()
export class KafkaGrpcController implements KAFKA_GRPC_SERVICE.KafkaGrpcService {
private logger = new Logger(KafkaGrpcController.name)
constructor(private readonly kafkaProducer: ProducerService) {}
@GrpcMethod('KafkaGrpcService', 'Produce')
produce(data: KAFKA_GRPC_SERVICE.ProducerRequest, metadata: any): Observable<any> {
this.logger.log('Producing message: {} with metadata: {}', data, metadata.toString());
this.kafkaProducer.produce(data);
return;
}
}
and my main.ts server side is like this:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.listen();
logger.log(`Microservice is listening on '${grpcOptions.options.url}'`);
}
bootstrap();
I can successfully start the server without any issues, although I do notice that there is one process with two file descriptors listening on :5000, with TCP. I am not sure whether this is standard. First question I have is why are there two services, and not one when I just create one microservice? Is this a NestJS thing?
node 50355 mattia 27u IPv6 * 0t0 TCP localhost:commplex-main (LISTEN)
node 50355 mattia 29u IPv4 * 0t0 TCP localhost:commplex-main (LISTEN)
Client
Client side is slightly different. I use the same .proto file. However, the grpc.options.ts file is slightly different since this needs to be a client:
export const grpcOptions: ClientOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
As you can see, ClientOptions are used for the client, but not for the server.
Client side, I have a GrpcClientModule which looks like this:
import { Module } from "@nestjs/common";
import { ClientsModule } from "@nestjs/microservices";
import { grpcOptions } from "../grpc.options";
import { GrpcClientController } from "./grpcclient.controller";
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_GRPC_SERVICE',
...grpcOptions,
}
])
],
controllers: [
GrpcClientController,
],
})
export class GrpcClientModule {}
and the GrpClientController is like this:
import { Metadata } from "@grpc/grpc-js";
import { Body, Controller, Get, Inject, OnModuleInit, Post } from "@nestjs/common";
import { Client, ClientGrpc } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { grpcOptions } from "src/grpc.options";
import { KAFKA_GRPC_SERVICE } from "./interfaces/kafkagrpcservice";
@Controller()
export class GrpcClientController implements OnModuleInit {
constructor(@Inject('KAFKA_GRPC_SERVICE') private client: ClientGrpc) {}
private grpcService: KAFKA_GRPC_SERVICE.KafkaGrpcService;
onModuleInit(): void {
this.grpcService = this.client.getService<KAFKA_GRPC_SERVICE.KafkaGrpcService>('KafkaRpcService')
}
@Post('produce')
async produce(data: KAFKA_GRPC_SERVICE.ProducerRequest): Promise<Observable<KAFKA_GRPC_SERVICE.ProducerResponse>> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'my_cookie=in_milk');
return this.grpcService.produce( { topic: data.topic, messages: data.messages }, metadata );
}
}
I start my client as follows:
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, grpcOptions);
await app.listen();
}
bootstrap();
Any help would be great!
Solution 1:[1]
So, I found the answer. It seems that it is impossible to start a single microservice as a gRPC server. Indeed, whenever I turn the microservice into a hybrid application as follows:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.create(KafkaGrpcServerModule);
const grpcServer = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
I am able to connect the client.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Mattia Bradascio |
