When I first started with RxJava, for me it was all about easy and parametric concurrency. Later I discovered the power of functional composition and real code reuse. But it never ceases to amaze me, so today I want to talk about
Observable’s completion semantics and
Let’s look at the simple
Observable implementation for OkHttp calls. Nothing fancy, we make a call, we pass the response to the stream and immediately end it:
1 2 3 4 5 6 7 8 9 10 11
This works, but sometimes we need to make a huge amount of HTTP calls and ignore some of the responses: for example, when adding autocompletion to the input field.
And guess what, RxJava works great for that kind of thing, we just need to add the unsubscription callback:
1 2 3 4 5 6 7 8 9 10 11 12
Now if we use
Observable.takeUntil(Observable) RxJava will call
Call.cancel() reducing the unneeded network load. And we get that completely for free, thanks to Observable’s powerful semantics.
Long ongoing processes
Let’s move to the Android land and look at the
MediaPlayer class. It’s one of the most “imperative” classes in the whole SDK with it’s implicit state, throwing
Exceptions on almost every method and really weird error propagation. So how can we bring it over to the safe and predictable RX world?
Ok, so if we want to represent an ongoing process as an RX stream, we should first figure out what kinds of values it’s emitting. For
MediaPlayer it’s the time of the playback – an
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
The main idea here is:
- A stream of
ticksis a an interval observable mapped to the
MediaPlayer’s current position, and it’s triggered 60 times per second.
completestream emits just one item – the moment
start()the player, then emit the ticks until stopped or
- I’m adding a second
Subscriptionso that ticking also stops on unsubscription, because I’m calling it as an external
- The most important part is the first unsubscription callback, where the player gets
So now every time we subscribe to this stream, it starts playing audio and emitting the current playback position. If it stops – it releases the player. If we unsubscribe – it releases the player. If it errors – it releases the player.
You can think of asynchronous
Subscriber.add(Subscription) as a
finally block for a synchronous
try-catch. We can represent any long-running task as a stream and get auto-cleanup and decent error-propagation for free. It could be anything: phone calls, http long-polling, audio recording. Hell, even UI animations!
This way any complex asynchronous long-running task may be reduced to a simple and composable
1 2 3
Please note that they’re not production-ready because they both lack
OnErrorListeners for decent error propagation and recovery.