How to not create your RxJS Observables

profile
Tim Deschryver
timdeschryver.dev

Last week a friend and college of mine was stuck with a problem. An NgRx effect was subscribed to a stream of WebSocket messages, but the effect didn't receive any messages. Though, we saw that the server was sending them and that they reached the client.

The problem wasn't the Effect, but the WebSocket stream that was wrongfully initialized.

The use case was to only establish the WebSocket connection for the users that had enough permissions to start a process. The WebSocket was created to report the progress of this process to the rest of the users.

The simplified version looks like this:

This doesn't work because the steam$ is reassignment to the "real" WebSocket stream after the Effect was initialized. When the WebSocket stream emits a new progress value, the Effect doesn't receive the update because it is listening to of({ status: 'idle' })

So how do we solve this? Mostly, the answer to that question when it comes to RxJS is to wrap the Observable inside another Observable.

Simplified reproduction link

To reproduce this in a simple way, I created 2 streams. One stream is listening for "a" keydown events, the second stream is listening to "b" keydown events. At first we're interested in the "a" events, and when the button (toggle) is clicked, we only want to receive the "b" events.

When the toggle is clicked, we still only receive "a" events

Implementation One: The Imperative Way link

To stay in the imperative world we can re-create this if statement inside an outer Observable. So, we start off by using the "a"-events and when the button is clicked, we switch the inner Observable to return the "b"-event stream. In the code below we use a RxJS Subject to recreate the toggle.

When the toggle is clicked, we only receive "b" events - this is what we want

While this works, we can do better.

Implementation Two: Let's Think In Streams link

Instead of re-creating a new stream with a Subject, why not re-use a stream? The toggle$ stream is exactly what we need to switch between the two streams, and it's already there!

We don't receive "a" events, when the toggle is clicked we do receive the "b" events

The above doesn't take the "a"-stream into account, it just creates the "b"-stream when the toggle emits a value. For our use-case this was perfect, but if needed we can provide an initial value.

With an initial value link

By using the startWith operator, we can start-off the stream with a single "a" value.

Before we do anything, we already receive an "a". We only receive "b" events after the toggle is clicked

With an initial stream link

Or, if you're interested in the "a"-stream you can use the concat method in combination with the takeUntil operator. This will handle all streams sequentially.

For our code this means that it will first emit all the "a" events, and when the toggle is clicked it switches to the "b" events.

First, we receive "a" events. After the button is clicked, we receive "b" events

Conclusion link

By wrapping the Observable (the inner-Observable) in an other Observable (the outer-Observable), the reference to the Observable remains the same. In the Observable we forsee a way to switch between the two Observables. This way, in our case, the NgRx Effect works as intended.

format_quote

There are multiple operators that provide a solution to different use case, try the RxJS Operator Decision Tree to find your solution

You can play around with the code in this Blitz.

Feel free to update this blog post on GitHub, thanks in advance!

Join My Newsletter (WIP)

Join my weekly newsletter to receive my latest blog posts and bits, directly in your inbox.

Support me

I appreciate it if you would support me if have you enjoyed this post and found it useful, thank you in advance.

Buy Me a Coffee at ko-fi.com PayPal logo

Share this post

Twitter LinkedIn