RxRecipes: Piping into a Subject

Written by scottmeschke | Published 2016/12/26
Tech Story Tags: rxjava | android | rxswift | reactive-programming | rxjs

TLDRvia the TL;DR App

Image Credit: wwardo@flickr

The RxRecipes series generally refers to Android and RxJava2, but the same recipes can apply to any Rx implementation and platform (RxJs, RxSwift, a backend with RxJava, etc).

Previous RxRecipe: Wrap your way to Rx

Challenge: Reactive source from Lifecycle Events

In my previous article, I talked about a simple way to convert an imperative API to a reactive one (with fromCallable). There are many times however, where it isn’t that simple.

Imagine you need some component to react to Activity lifecycle events.

  • Cleaning up a cache when your Activity stops.
  • Recording a specific analytics event every time your Activity pauses.
  • Binding some Android specific component (like a TextWatcher) to the Activity’s start.

However, you immediately run into the problem of where to put your Rx code. There is no simple method or entry point.

These hooks happen at different points, and can happen multiple times in different orders and sections depending on user input, application logic, and operating system events (like a facebook message pop-up triggering onPause()).

We want to get to something like this:

But how do we get there?

The plan is coming together quite nicely… Muah-ha-ha-ha!

Solution: Use a Subject!

From the official documentation:

Represents an Observer and an Observable at the same time, allowing multicasting events from a single source to multiple child Subscribers.

Let’s review that: a Subject is BOTH an observer and observable.

While Subjects have many possible use-cases, they are often used as a sort of multi-source emitter.

We will take multiple sources (in this case multiple method callbacks) and pipe them all into a single concise subject.

Subjects are great! They can hide the complexity of the input, and simplify the client API to treat it as a single source.

Let’s see the code

Note: Before we start with the code, it should be noted this is a pretty common approach to the Android lifecycle callbacks. For more examples see RxLifecycle from Trello, or the recently open sourced Kickstarter Android App.

The first thing we need is some type to emit.

We could create a class which can contain some state or meta-data (like included the saved instance state Bundle with the created event) but for simplicity let’s just use a simple enum.

Now we just need to give the activity it’s own subject and push the proper instances of these events at the right time.

  • We create the subject with .create() as soon as the new instance of the class is initialized
  • We call onNext() to the subject with the appropriate event at the appropriate time

Which Subject to Use?

We use a BehaviorSubject here so that any subscriber immediately get’s the most recently emitted (current) lifecycle state of the activity.

A simplified overview of the main included subjects in RxJava2 is as follows:

  • AsyncSubject: Emits the most emitted item and then immediately completes or errors.
  • BehaviorSubject: Emits the most recent item and then all subsequent items.
  • PublishSubject: Emits all subsequent items.
  • ReplaySubject: Emits all of the items that have been emitted up to this point.

Don’t forget to .hide()

We call hide() because we don’t want to mess with any of the internals of the behavior subject itself (like the underlying BehaviorDisposable).

When we unsubscribe we want to just dispose of the connection between the subject’s subscriber and and the subject. NOT the subject and it’s underlying subscription.

Next in the RxRecipes Series:

TBD — I’ll update this with a link to the next post once it’s done.

Enjoy the recipe?

Disagree with the implementation?

Have burning-hot fiery opinions about some random sentence above?

I’d love to hear from you, leave your comments below!


Published by HackerNoon on 2016/12/26