Late Subscribing and Polling APIs with RxAndroid

by

Howdy, lazy bum! Enjoying the ReactiveX magic? Want to take a look at polling?

I’ll be walking you through a solution I put together for one of our up and coming apps! It works rather well, I learned a lot, and so far no complaints…although there are no users yet either!

Feeling quite charitable, I’m going to let you in on some useful bits and pieces as we build up to polling: threading, late subscribing, replay, manual re-triggering and error handling (a must for preserving replays).

The samples are written for the android platform (more specifically, we’re using RxAndroid), but they could (theoretically) easily be ported over to other languages.
I’ve created a gist regrouping the full implementation.

If you don’t know about Reactive Expressions:
ReactiveXRxJava, RxAndroid, RxJS, RxSwift, Rx.NET, etc. ) is a powerful framework for functional programing (oh, how wonderful!).

It’s a change in paradigm. By pushing events/objects down a stream of functions, we acquire a lot of freedom when it comes to handling those objects.

It’s a comprehensive set of flexible and composable tools, allowing us to implement complex logic concisely.

ie. IOW: AsyncTasks.
Here’s a good starting point: Grokking RxJava by Dan Lew.

THE API SERVICE

We’ll start by admitting the existence of a webService.getApiData() method returning an Observable<List<Data>>. When it gets a response from the api, it’ll emit it’s result down the observable stream. 

Handily, Retrofit supports Observables as a return type for it’s interface declaration (see here).

[code lang=”java”]
public interface Webservice {
  @GET("api/data")
  public Observable<List<Data>> getApiData();
}
[/code]

THREADING – COMPOSING

Blocks of code wrapped in an Observable can trivially be run on a thread of your choosing.
You can also trivially set the thread you’ll be receiving the response on.

[code lang=”java”]
public Observable<List<Data>> getData() {
  return webService.getApiData()
//Execute api call on secondary thread.
          .subscribeOn(Schedulers.io())
          .unsubscribeOn(Schedulers.io()) // (only necessary for Retrofit)
// Receive response on mainThread
          .observeOn(AndroidSchedulers.mainThread());
}

dataService.getData().subscribe(dataList -> {
  // do something with the list.
}
[/code]

Let’s make this a bit more concise and reusable:

[code lang=”java”]
// Tick
return webService.getApiData()
          .subscribeOn(Schedulers.io())
          .unsubscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());
// Tack
return webService.getApiData()
      .compose(listObservable ->
              listObservable.subscribeOn(Schedulers.io())
                      .unsubscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
      );
// Tock
Observable.Transformer networkScheduling = observable ->
      observable.subscribeOn(Schedulers.io())
              .unsubscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread());
public Observable<List<Data>> getData() {
     return webService.getApiData()
// The compiler needs us to explicitly declare the transformer’s input and output types.
             .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling);
}
[/code]

Hmm… each subscriber will re-trigger a new API call.

LATE SUBSCRIBING – REPLAY

We don’t really want late subscribers to retrigger the API call. It would be wasteful and time consuming.

The first step will be returning the same Observable instance: dataObservable. That however, is not enough to prevent the new triggering of the api call. Each new subscriber would trigger one.

The second step then, is to make this observable a ConnectableObservable and replay the previously emitted result.

[code lang=”java”]
ConnectableObservable<List<Data>> dataObservable;
public Observable<List<Data>> getData() {
  if (dataObservable == null) {
      dataObservable = webService.getApiData()
              .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
              .replay(1); // Replay the last X (1) result emitted

      dataObservable.connect(); // Connect the observable
  }
  return dataObservable;
}
[/code]

Note that this allows you to trigger the api call, and only later attach the view to the result.

Hmm… 0-: we’re always returning the same result…

MANUAL RE-TRIGGERING

How to? We need a hook. No, not a hook; a Hook. A Hook who’s going to run amok and break the Neverland cycle and bring us back to the real world, where we actually get things done!

Setting dataObservable to null won’t work because that means the early subscribers won’t be receiving the new data (they are subscribed to the original observable).

Hmm…

Floating down the stream of Observable method signatures… floating… drowsily…

Aha!

repeatWhen(Observable) tick, tock, tick!
Re-triggers the source observable when specified Observable emits an item!

We have our Hook!

[code lang=”java”]
PublishSubject<Long> hook = PublishSubject.create();

public Observable<List<Data>> getData() {
  if (dataObservable == null) {
      dataObservable = webService.getApiData()
              .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
// repeat when the hook ticks
              .repeatWhen(observable -> hook)
              .replay(1);

      dataObservable.connect();
  }
  return dataObservable;
}

// Refresh Data
public void triggerDataFetch() {
  hook.onNext(-1L); // <<<<<<<<<<< Tick!
}
[/code]

Subjects are observables as well as subscribers. They can both take and emit items.
Every time we call hook.onNext(-1L) we’ll be emitting an item; which will trigger the repeat.
The type of item emitted by the Subject doesn’t matter much, but as you’ll see, we’ll need it to be a Long very soon

POLLING

We’re close! Let’s let the clock do our ticking and we’ll have some polling!

[code lang=”java”]
public Observable<List<Data>> getData() {
  if (dataObservable == null) {
      dataObservable = webService.getApiData()
              .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
              .repeatWhen(observable ->
// Repeat every 5 minutes.
// tick, tack, tick, tick, t..
                      Observable.interval(5, TimeUnit.MINUTES)
              )
              .replay(1);

      dataObservable.connect();
  }
  return dataObservable;
}
[/code]

POLLING + MANUAL RE-TRIGGERING

Perhaps keeping control on when to trigger data fetches might be a good idea:

[code lang=”java”]
public Observable<List<Data>> getData() {
  if (dataObservable == null) {
      dataObservable = webService.getData()
              .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
              .repeatWhen(observable ->
// tick, tack, tick, tack, t…
                      Observable.interval(5, TimeUnit.MINUTES)  
.mergeWith(hook)) // <<<< Tick-Tock! surprise!
              )
              .replay(1);

      dataObservable.connect();
  }
  return dataObservable;
}

public void triggerDataFetch() {
  hook.onNext(-1L);
}
[/code]

To merge the interval and our hook, they need to be emitting the same type of objects. Thus the Long.

ERROR HANDLING

I made light of this previous polling segment.
There is however, a fatal flaw to the replay mechanism we’ve set up so far.

With the above current code sample, any Exception will actually break our .replay() capability, and unsubscribe any subscribers.
Observables have this really nice capacity to easily catch any exceptions from upstream, and handle them in the subscriber. That capacity however, is really only actually meant for unrecoverable errors.

In our case, there may be exceptions thrown by the API (or by some internal services), that won’t be catastrophic failures, but might just be caused by connectivity issues or system downtime.

Should we re-subscribe every time that happens? Can we reasonably and scalably handle that? Maybe, but ouch, and it’s the wrong way to handle recoverable errors anyway.

So how can I catch errors without finalizing the stream?

Observable.onErrorResumeNext(Observable) is the answer.

This operator will catch an error, and instead of calling onError (thus finishing); it will pass control of the stream to another observable.

That observable can just as easily simply transfer the error down, query the api for something else, or query the local datastore.

Still not that simple. Downstream we’re expecting an object of type List<Data>. The error is of type Throwable. We do need send that error down and let the subscribers know what’s happening upstream.

How about encapsulating an error or result in a templated object? A Wrapper<T>?

[code lang=”java”]
public static class Wrapper<ResponseType> {
  public Throwable error;
  public ResponseType response;

  private Wrapper() { }

  public static <T> Wrapper<T> error(Throwable t) {
      Wrapper<T> wrapper = new Wrapper<T>();
      wrapper.error = t;
      return wrapper;
  }

  public static <T> Wrapper<T> response(T response) {
      Wrapper<T> wrapper = new Wrapper<T>();
      wrapper.response = response;
      return wrapper;
  }

  public boolean hasError() { return error != null; }
}

ConnectableObservable<Wrapper<List<Data>>> dataObservable;

public Observable<Wrapper<List<Data>>> getData() {
  if (dataObservable == null) {
      dataObservable = webService.getApiData()
              .compose((Observable.Transformer<List<Data>, List<Data>>) networkScheduling)
              .map(dataList -> Wrapper.response(dataList))
// Retry a few times before letting an error through.
              .retry(5)
              .onErrorResumeNext(throwable -> {
                  // If the server is unreachable, let the subscriber know.
   // The user may be able to do something about his connection.
                  if (throwable instanceof UnknownHostException || throwable instanceof …) {
// Recoverable error – transmit error without breaking.
                      return Observable.just(Wrapper.error(throwable));
                  }
// In case of an unrecoverable error, let it end.
                  return Observable.error(throwable);
              })
              .repeatWhen(observable ->
                      Observable.interval(5, TimeUnit.MINUTES)
                          .mergeWith(hook)
              )
              .replay(1);

      dataObservable.connect();
  }
  return dataObservable;
}
 
// Consumer
dataService.getData().
      subscribe(wrapper -> {
          if (wrapper.hasError()) {
              // handle error
          } else {
              List<Data> list = wrapper.response;
              // do something.
          }
      }, e -> logger.error("Unrecoverable error getting data.", e));
[/code]

A WORD ON UNIT TESTING

Unit testing with RxJava and RxAndroid is remarkably simple. The difficulty would seemingly be in thread control.

  • For testing, instead of a normal Scheduler, you can use a TestScheduler which gives you control over its execution. 
  • Use testScheduler.triggerActions() to run all observables assigned to it and testScheduler.advanceTimeBy(5, TimeUnit.SECONDS) to advance its clock.
  • Instead of directly using Schedulers.io(), inject a build-specific scheduler into your service.
  • Observable.interval can take a Scheduler as third parameter. By providing a TestScheduler there and keeping a hook to it, you’ll be able to trigger the intervals at will.

CONCLUDING

Again here is the gist: https://gist.github.com/MartinHarkins/ece31c41e436fa4cdc44

It took me a while to trim this algorithm down. I had to keep reminding myself that I needed to think in terms of the best user experience I could possibly provide ( within timeframes of course );  think beyond the limitations I traditionally would have had.

What are all the things that should, could, might be happen? How can we fit all of that in just a few lines of code?

Remember/Imagine what it took to write code supporting all that functionality? Have you ever considered actually supporting all those use-cases? Late connections alone would be quite the dragon.

I’ve come up with one skeleton implementation. Consider that on top of that, you might want to have some data-persistency, some enhanced connectivity handling, paging, etc.

Cheers!

1 Comment

  1. Guy on said:

    Thanks! Great post really useful :)

Leave a Reply

Your email address will not be published. Required fields are marked