'Angular 10 forkJoin cancelling requests

This should be a simple one for the angular pros out there. I have this simple method:

private addMultiple(products: Product[]): void {
    let observables: Observable<Product>[] = [];

    products.forEach((product: Product) => observables.push(this.productService.create(product)));

    forkJoin(observables).subscribe();
}

If I send one product to the method, it works fine, but if I try multiple (100) I just get loads of cancelled requests and I am not sure why.

Does anyone know if I should be using a different method?


This is my productService:

import { Inject, Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { finalize, map } from 'rxjs/operators';
import { BehaviorSubject, Observable } from 'rxjs';

import { Product, Attempt, RequestOptions } from '../models';
import { HttpServiceConfig, HTTP_SERVICE_CONFIG } from '../configs';

@Injectable({
providedIn: 'root',
})
export class ProductService {
    private endpoint: string = 'products';

    public items: BehaviorSubject<Product[]>;
    public loading: BehaviorSubject<boolean>;

    constructor(
        @Inject(HTTP_SERVICE_CONFIG) public config: HttpServiceConfig,
        private httpClient: HttpClient
    ) {
        this.items = new BehaviorSubject<Product[]>([]);
        this.loading = new BehaviorSubject<boolean>(false);
    }

    list(categoryId: number, options?: RequestOptions): Observable<Product[]> {
        this.loading.next(true);
        return this.httpClient
        .get<Attempt<Product[]>>(
            `${this.config.apiUrl}/categories/${categoryId}/${this.endpoint}/master`,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product[]>) => {
            if (response.failure) return response.result;
            this.items.next(response.result);
            return response.result;
            }),
            finalize(() => this.loading.next(false))
        );
    }

    get(id: number, slug: string, options?: RequestOptions): Observable<Product> {
        return this.httpClient
        .get<Attempt<Product>>(
            `${this.config.apiUrl}/${this.endpoint}/${id}?slug=${slug}`,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product>) => {
            return response.result;
            })
        );
    }

    public create(item: Product, options?: RequestOptions): Observable<Product> {
        return this.httpClient
        .post<Attempt<Product>>(
            `${this.config.apiUrl}/${this.endpoint}`,
            item,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product>) => {
            if (response.failure) return response.result;
            const newItem = response.result;
            const items = this.items.value;
            items.push(newItem);
            this.items.next(items);
            return response.result;
            })
        );
    }

    public updateSpecification(
        item: Product,
        options?: RequestOptions
    ): Observable<Product> {
        return this.httpClient
        .put<Attempt<Product>>(
            `${this.config.apiUrl}/${this.endpoint}/specification`,
            item,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product>) => {
            if (response.failure) return response.result;
            const newItem = response.result;
            const items = this.items.value;
            this.remove(items, newItem.id);
            items.push(newItem);
            this.items.next(items);
            return response.result;
            })
        );
    }

    public approve(item: Product, options?: RequestOptions): Observable<Product> {
        return this.httpClient
        .put<Attempt<Product>>(
            `${this.config.apiUrl}/${this.endpoint}/approve`,
            item,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product>) => {
            if (response.failure) return response.result;
            const newItem = response.result;
            const items = this.items.value;
            this.remove(items, newItem.id);
            items.push(newItem);
            this.items.next(items);
            return response.result;
            })
        );
    }

    public reject(item: Product, options?: RequestOptions): Observable<Product> {
        return this.httpClient
        .put<Attempt<Product>>(
            `${this.config.apiUrl}/${this.endpoint}/reject`,
            item,
            options?.getRequestOptions()
        )
        .pipe(
            map((response: Attempt<Product>) => {
            if (response.failure) return response.result;
            const newItem = response.result;
            const items = this.items.value;
            this.remove(items, newItem.id);
            items.push(newItem);
            this.items.next(items);
            return response.result;
            })
        );
    }

    private remove(items: Product[], id: number | string) {
        items.forEach((item, i) => {
        if (item.id !== id) {
            return;
        }
        items.splice(i, 1);
        });
    }
}


Solution 1:[1]

It may be that the server cancels requests if it gets hit by too many requests at almost the same time, which is what happens using forkJoin with a hundred of Observables.

forkJoin starts the "execution" of all the Observables in parallel.

If you want to control the number of parallel executions, you should try mergeMap with the concurrencyparameter set. This approach can be a bit more complex if you want to match the request with the response.

The code could look something like this

// set the number of requests you want to fly concurrently
const concurrency = 10;
// use from to generate an Observable from an array of Products
from(products).pipe(
  // use mergeMap to execute the request
  mergeMap(product => this.productService.create(product).pipe(
     // return an object which contains the result and the product which started the request
     map(result => ({result, product}))
  ), concurrency),  // set the concurrency in mergeMap
      // toArray transform the stream into an Array if this is what you want
  toArray()
).subscribe(
  res => {// do something with res, which is an array of objects containing objects with result and product}
)

Solution 2:[2]

It is important to note that if any requests fail during your create in a forkJoin, remaining requests will be cancelled.

You need to catchError on each create if you want to make sure all requests don't get cancelled.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 BizzyBob
Solution 2 jsgoupil