Adel Nizamutdinov

I wanted to share this

Using RxJava Observable's Completion Semantics for Greater Good

When I first started with RxJava, for me it was all about easy and parametric concurrency. Later I discovered the power of functional composition and real code reuse. But it never ceases to amaze me, so today I want to talk about Observable’s completion semantics and Subscriber.add(Subscription) method.

Unsubscription callback

Let’s look at the simple Observable implementation for OkHttp calls. Nothing fancy, we make a call, we pass the response to the stream and immediately end it:

1
2
3
4
5
6
7
8
9
10
11
Observable<Response> call(OkHttpClient client, Request request) {
  return Observable.create(subscriber -> {
    final Call call = client.newCall(request);
    try {
      subscriber.onNext(call.execute());
      subscriber.onCompleted();
    } catch (IOException e) {
      subscriber.onError(e);
    }
  });
}

This works, but sometimes we need to make a huge amount of HTTP calls and ignore some of the responses: for example, when adding autocompletion to the input field.

And guess what, RxJava works great for that kind of thing, we just need to add the unsubscription callback:

1
2
3
4
5
6
7
8
9
10
11
12
Observable<Response> call(OkHttpClient client, Request request) {
  return Observable.create(subscriber -> {
    final Call call = client.newCall(request);
    subscriber.add(Subscriptions.create(call::cancel));
    try {
      subscriber.onNext(call.execute());
      subscriber.onCompleted();
    } catch (IOException e) {
      subscriber.onError(e);
    }
  });
}

Now if we use Subscription.unsubscribe() or Observable.takeUntil(Observable) RxJava will call Call.cancel() reducing the unneeded network load. And we get that completely for free, thanks to Observable’s powerful semantics.

Long ongoing processes

Let’s move to the Android land and look at the MediaPlayer class. It’s one of the most “imperative” classes in the whole SDK with it’s implicit state, throwing Exceptions on almost every method and really weird error propagation. So how can we bring it over to the safe and predictable RX world?

Ok, so if we want to represent an ongoing process as an RX stream, we should first figure out what kinds of values it’s emitting. For MediaPlayer it’s the time of the playback – an Integer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<Integer> stream(MediaPlayer mp) {
    return Observable.create(subscriber -> {
        subscriber.add(Subscriptions.create(() -> {
            if (mp.isPlaying()) {
                mp.stop();
            }
            mp.reset();
            mp.release();
        }));
        mp.start();
        subscriber.add(ticks(mp)
                               .takeUntil(complete(mp))
                               .subscribe(subscriber));
    });
}

Observable<Integer> ticks(MediaPlayer mp) {
    return Observable.interval(16, TimeUnit.MILLISECONDS)
            .map(y -> mp.getCurrentPosition());
}

Observable<MediaPlayer> complete(MediaPlayer player) {
    return Observable.create(subscriber -> player.setOnCompletionListener(mp -> {
        subscriber.onNext(mp);
        subscriber.onCompleted();
    }));
}

The main idea here is:

  1. A stream of ticks is a an interval observable mapped to the MediaPlayer’s current position, and it’s triggered 60 times per second.
  2. A complete stream emits just one item – the moment MediaPlayer stops playing.
  3. We start() the player, then emit the ticks until stopped or unsubscribed
  4. I’m adding a second Subscription so that ticking also stops on unsubscription, because I’m calling it as an external Observable
  5. The most important part is the first unsubscription callback, where the player gets released.

So now every time we subscribe to this stream, it starts playing audio and emitting the current playback position. If it stops – it releases the player. If we unsubscribe – it releases the player. If it errors – it releases the player.

You can think of asynchronous Subscriber.add(Subscription) as a finally block for a synchronous try-catch. We can represent any long-running task as a stream and get auto-cleanup and decent error-propagation for free. It could be anything: phone calls, http long-polling, audio recording. Hell, even UI animations!

This way any complex asynchronous long-running task may be reduced to a simple and composable Observable:

1
2
3
Observable<Pair<Integer, Integer>> play(MediaPlayer mp) {
    return prepare(mp).flatMap(RxMediaPlayer::stream);
}

Keep composing!

PS

Here’s the complete code for the RxMediaPlayer

And bonus code for the RxMediaRecorder :)

Please note that they’re not production-ready because they both lack OnErrorListeners for decent error propagation and recovery.

Comments