'Rxjs how to get all values that are buffered during a concatMap

Consider the following stream:

interval(1000)
  .pipe(
    take(5),
    concatMap((val) => {
      console.log(val, 'process');
      return of(val).pipe(delay(3000));
    })
  )
  .subscribe((val) => console.log(val, 'emit'));

As expected, 0 emits and hits the concat map. We see 'process' logged. During this time, 1 and 2 have emitted from the source. What i'd like to do is say "Now that my concatmap has finished, give me all items that are currently in the stream." So the next emit would be [1, 2], and we would see '[1,2] process'.

I am not sure how to achieve this. I have tried using a buffer and emitting every time concatMap emits, but it never gets an initial emit on that buffer, which resorted in hacking with timers and race and even then it did not work very well.

This is my current solution:

const dequeueSignal$ = new Subject();

interval(5000)
  .pipe(
    buffer(dequeueSignal$),
    concatMap((val) => {
      console.log(val, 'process');
      return of(val).pipe(
        delay(getRandomInt(20000)),
        tap(() => dequeueSignal$.next(null))
      );
    })
  )
  .subscribe((val) => console.log(val, 'emit'));

dequeueSignal$.next(null)

This is the closest I can get. This means that if theres nothing in the buffer, it still emits and the cycle continues. However, this has major drawbacks:

  • While nothing is in the queue, its constantly looping (the delay is only there for debug purposes. In my real scenario, this is a http call)
  • Relies on an external call to start the whole thing off.

As such, this feels hacky and brittle. Is there an operator set I can use to create this scenario?



Solution 1:[1]

If I understand the problem right, I would proceed like this.

First we isolate the source stream. Consider that we use the share operator to make sure that the source$ stream is shared by the other Observables we are going to create later on starting from source$.

const source$ = interval(1000).pipe(share(), take(5));

Then I think that delay you put in the example represents some kind of processing performed by some kind of function, probably a call to an async service. If this is true, then we could have a process function which returns an Observable. A simulated version of this function could be this

function process(val) {
  return of(val).pipe(
    delay(3000),
    tap({
      next: (val) => console.log(val, "processed"),
    })
  );
}

now, if all these assumptions are true, then we can define a dequeueSignal$ Subject, as in your example, and then 2 different streams, a stream that takes just the first element notified by source$ and a stream that take all other elements, like this

const dequeueSignal$ = new Subject<any>();

const first$ = source$.pipe(
  first(),
  concatMap((val) => {
    return process(val).pipe(
      tap({
        complete: () => {
          dequeueSignal$.next(null);
        },
      })
    );
  })
);

const afterFirst$ = source$.pipe(skip(1)).pipe(
  buffer(dequeueSignal$),
  concatMap((val) => {
    return process(val).pipe(
      tap({
        complete: () => {
          dequeueSignal$.next(null);
        },
      })
    );
  })
);

dequeueSignal$ is used to trigger the release of the buffer stored with the buffer operator.

dequeueSignal$ is nexted in the first$ stream once and any time the afterFirst$ stream notifies.

A notification of dequeueSignal$ triggers the release of the buffered items.

Here a stackblitz that shows the code.

Probably a more elegant solution can be implemented as a variation of the mergeMap operator code, but it may look a bit more complex.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1