RxOAuth2: reactive OAuth approach for Android, pt. 1
TL;DR: with colleagues from Ackee we developed RxOauth2, an open source library to handle OAuth with RxJava streams. You may take a look at it directly or read this blog post to find out why and how we implemented it. The second part is coming soon.
For the last couple of years, all the Android apps we are developing in Ackee are based on Rx streams. We’re getting our callbacks from REST API (Retrofit) with Singles and Completables, we’re reading data from our database with Flowables (Room) and listening to user events and updating our UI with Observables. First it was RxJava1, then we switched to RxJava2, but it is still the reactive world we’re used to live in.
When we started to use Retrofit, one of the problems we had to solve on each of our project was authorization. Since our backend uses OAuth2 standard, we need to solve token lifespan as well. That’s how the requirements for our library appeared:
- It should persist the credentials (access token, refresh token, etc.)
- It should create authorization header for our requests and add access token to it if exists
- It should handle invalid/expired access token and request the new one with the help of refresh token
- It should automatically repeat the failed request with the new token after successful refresh
- It should provide the fallback option if refresh token failed as well (e.g. logout the user from the app and clear credentials)
After RxJava was integrated into our projects, it became even easier to implement such a library, so we started with first version of RxOAuth. There were some pitfalls though.
The first requirement was easy, we created a class called [OAuthStore](https://github.com/AckeeCZ/rxoauth/blob/master/core/src/main/java/cz/ackee/rxoauth/OAuthStore.kt)
which stores and provides the credentials. In first versions it was public, but later, we made it internal for convenience reasons to leave the user with only one facade class.
Since we use Retrofit, the most obvious way to implement the second requirement was to use OkHttp interceptor. [OAuthInterceptor](https://github.com/AckeeCZ/rxoauth/blob/master/core/src/main/java/cz/ackee/rxoauth/OAuthInterceptor.kt)
is provided with [OAuthStore](https://github.com/AckeeCZ/rxoauth/blob/master/core/src/main/java/cz/ackee/rxoauth/OAuthStore.kt)
internally by [RxOAuthManager](https://github.com/AckeeCZ/rxoauth/blob/master/core/src/main/java/cz/ackee/rxoauth/RxOAuthManager.kt)
and in case the access token is present, it creates authorization header and adds token to it.
The rest requirements are related to RxJava entities since all our request live in the reactive world. According to OAuth standard, if the token is expired or invalid, the server returns 401, this is how we decide if refresh is needed. However, 401 sometimes has another meaning (in case of login for example), so we need to be able to exclude some requests from refreshing mechanism (more on this later). Retrofit converts 401 to HttpException
and sends it to reactive stream as an error. So all we need is to use onErrorResumeNext
operator and check if the error is HttpException
with 401 code. If yes, refresh the token and flatMap
to the upstream observable again.
For those of you who aren’t so familiar with RxJava it may seem totally senseless. Let’s take a look at the part of RxOAuthManager
, the main class of the library:
fun transformSingle(): SingleTransformer {
return SingleTransformer { upstream ->
if (oAuthStore.tokenExpired()) {
refreshTokenObservable!!.flatMapSingle { upstream }.firstOrError()
} else {
upstream.onErrorResumeNext { throwable ->
if (errorChecker.invalidAccessToken(throwable)) {
refreshTokenObservable!!.flatMapSingle { upstream }.firstOrError()
} else Single.error(throwable)
}
}
}
This function returns an RxJava transformer. Transformers allow to compose RxJava entities and optionally transform them to another element type. The upstream is the part of the stream before transformation. The downstream is the result stream. In our case, we don’t need to change the type of Single, we just need to perform some actions with the stream data: check error type, perform the token refresh and repeat the upstream chain again (it is done with flatMap operator).
oAuthStore.tokenExpired()
checks the token TTL locally if it is stored in OAuthStore to prevent redundant requests.
errorChecker: ErrorChecker
is just an interface that checks if the error from Retrofit (or any other Throwable
) indicates token expiration. The [DefaultErrorChecker](https://github.com/AckeeCZ/rxoauth/blob/development/core/src/main/java/cz/ackee/rxoauth/DefaultErrorChecker.kt)
is our implementation for HTTP standard.
refreshTokenObservable
is the Observable
that envelopes the call to the refresh token endpoint of your API. But before we’ll take a look at the code, there is a tricky part to it.
Suppose you have a place in your app, which makes two independent request to the API simultaneously. Suppose that your access token in expired in this moment. If both requests are wrapped with our refresh logic, both of them will make a call to your refresh token endpoint. This may cause unwanted synchronization issues. It depends on your backend implementation, but one possible problem is that the first request will be repeated after refresh, but the second refresh will invalidate the access token for the first request and it fill fail. So we need to ensure somehow that during multiple API requests running (and failing) simultaneously only one token refresh will be performed and all the requests will be repeated with the new token.
That’s how we solved it:
private var refreshTokenObservable: Observable? = null
init {
initRefreshTokenObservable()
}
private fun initRefreshTokenObservable() {
refreshTokenObservable = Single.defer { refreshAccessToken() }
.toObservable()
.share()
.doOnComplete { initRefreshTokenObservable() }
}
private fun refreshAccessToken(): Single {
val refreshToken = oAuthStore.refreshToken ?: ""
return refreshTokenAction(refreshToken)
.doOnSuccess { credentials -> saveCredentials(credentials) }
.doOnError { throwable ->
if (errorChecker.invalidRefreshToken(throwable)) {
clearCredentials()
onRefreshTokenFailed(throwable)
}
}
}
refreshTokenAction
is a function provided by the library user that takes a String (refresh token) and returns a Single with [OAuthCredentials](https://github.com/AckeeCZ/rxoauth/blob/development/core/src/main/java/cz/ackee/rxoauth/OAuthCredentials.kt)
(interface for access/refresh tokens pair and other optional parameters). In other words, it is a refresh token call to your REST API.
The defer()
operator generates an Observable
only when the observer subscribes to the stream, which ensures the freshness of the data (refresh token in our case). The share()
operator multicasts the same original source as long as at least one subscriber listens to it. It means that if we have a request already running and refreshing the token, all other requests will reuse the result from this token refresh call without making the request again.
If all observers complete, the shared refreshTokenObservable
dies as well, so we run the initRefreshTokenObservable()
again on complete.
The last block just checks if token refresh failed as well, and if yes, it clears stored credentials and runs the callback onRefreshTokenFailed: (Throwable) -> Unit
provided by the library user.
I hope that now it all comes together. Take a look at the core module of our library for more details. We support the main RxJava entities: Observable
, Single
and Completable
.
We have created convenient Kotlin extension wrapping operators for refresh functionality, but we still have to wrap each of our requests with them manually. We didn’t want this redundant copypasting, so we came up with a couple of solutions to automatically integrate our RxOauth logic into out REST API layer. You will find out about them in the second part of this series.