'Custom hook to subscribe to array of observables and take the latest value from each
I have an array of Observables streams$ which each emit a value once, when the associated async operation completes. I want to aggregate the results in an array arr, where arr[i] = undefined if streams$[i] hasn't completed, and the resolved value of streams$[i] if it has, so for 3 observables, the hook should return the following:
[undefined, undefined, undefined] // 1
['A', undefined, undefined] // 2
['A', undefined, 'C'] // 3
['A', 'B', 'C'] // 4
// done, unsubscribe
This is what I have currently:
const useLatest = <T>(streams$: Observables<T>[]) => {
const [state, setState] = useState<T[]>(Array(streams$.length).fill(undefined));
const latest$ = combineLatest(streams$.map($ => $.pipe(startWith(undefined))));
useEffect(() => {
const subscription = latest$.subscribe((values) => setState(values));
return () => {
subscription.unsubscribe();
}
}, []);
return state;
}
This gives me close to the correct return values (prints all undefined twice), but due to the empty dependency array will not recompute if streams$ becomes a different array of new observables. Other things I've done to try and fix this either result in infinite emissions of either [undefined, undefined, undefined] or ['A', 'B', 'C']. Using things like piping latest$ to takeWhile with second parameter true, or piping the individual streams$[i] to take(2), etc.
Solution 1:[1]
Toward a solution
Not sure why you can put streams as a dependency. I've never used react (or react hooks), but I've read up on the basics. Looks like you'll want the effect to clean up and re-run whenever you've got a new streams array, (by reference). If I understand correctly, the dependency array works by reference equality.
const useLatest = <T>(streams: Observable<T>[]) => {
const [state, setState] = useState<T[]>(Array(streams.length).fill(undefined));
useEffect(() => {
const sub = combineLatest(
streams.map($ => $.pipe(
startWith(undefined)
))
).pipe(
filter(latest => !latest.every(v => v === undefined))
).subscribe((values) => setState(values));
return sub.unsubscribe.bind(sub);
}, [streams]);
return state;
}
Some other operators in action :P
Something like this should work the same.
const useLatest = <T>(streams: Observable<T>[]) => {
const defaultState = () => Array(streams.length).fill(undefined);
const [state, setState] = useState<T[]>(defaultState());
useEffect(() => {
const sub = merge(...streams.map((s,i) => s.pipe(
map(v => ({i,v}))
))).pipe(
scan((acc, {i,v}) => {
acc[i] = v;
return acc;
}, defaultState())
).subscribe((values) => setState(values));
return sub.unsubscribe.bind(sub);
}, [streams]);
return state;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
