'RxJava: Subscribing to a data structure that may have items in it

I'm working on a gRPC application where there is bidirectional streaming between two servers. When each server comes up, they make a gRPC request to get a list of logged in users, but they can also send each other updates of user events for logging in and logging out.

I am new to gRPC and RxJava, but I've combined them in such a way to wrap my StreamObserver responseObserver in an Observable, so for every onNext(), I can call onNext on my subscriber with the data I received.

As I mentioned, the idea for the message flow is first have each server reconcile users, and then exchange update events, but there is nothing stopping each server from sending update events whenever they want. So currently what I am doing is only sending my subscriber the list of users retrieved from the remote server, and I'd like to store any update events I get in a data structure.

When the reconcile finishes, which could be with error or success, I'd like to create another observable wrapped around the data structure so the subscriber can process whatever is in there, as well as get updates for newly emitted data into the data structure.

Why don't I just submit the update events to the subscriber doing the reconciliation? While the logic for update events and reconciling all users is pretty much the same, the way the whole project is modeled is to call a reconcile handler which will return a CompletableFuture, so the reconcile method is expected to return with a success or failed status, and only after that can the so-called "steady state sync" resume. I know it seems kind of silly, but this is what I am currently working with.

Is it feasible to wrap an Observable around some kind of concurrent linked queue?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source