程序员人生 网站导航

RxJava defer操作符实现代码支持链式调用

栏目:综合技术时间:2016-09-27 09:06:19

前言

现在愈来愈多Android开发者使用到RxJava,在Android使用RxJava主要有以下好处:
1,轻松切换线程。之前我们切换线程主要使用Handler等手段来做。
2,轻松解决回调的嵌套问题。现在的app业务逻辑愈来愈复杂,多的时候3,4层回调嵌套,使得代码可保护性变得很差。RxJava链式调用使得这些调用变得扁平化。

随着RxJava的流行,愈来愈多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。

但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是1些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。
如果还是想之前那样做,那就太low。

下面就来探讨下如何使得代码支持RxJava风格

遇到这类问题,在我脑海里显现的第1种方式就是通过Observable的create操作符。由于在里面我们可以控制数据的发射。就像上1篇文章那样《RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断》

以下代码片断:

Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { try { List<Article> as = articleDao.queryBuilder() .where(ArticleDao.Properties.CategoryId.eq(categoryId)) .orderDesc(ArticleDao.Properties.Id) .offset((pageIndex - 1) * pageSize) .limit(pageSize).list(); if (as == null || as.isEmpty()) { subscriber.onNext(null); }else{ subscriber.onNext(as); } }catch (Exception e){ subscriber.onError(e); } subscriber.onCompleted(); } });

这样确切没有无问题。但是我们要封装下, 每一个方法都这样写保护性和扩大比较差(例如有天我想换种方式来实现而不是create,如果通过方法封装1下,修改就变得容易多了)
如何封装呢?通过分析知道,大部份代码是相同的,只是我们的业务不1样。那末通过模板方法解决吧。业务方法通过接口回调的方式传递进来,由于我们不知道调用者是甚么业务。

回调接口以下(T表示我们业务数据):

public interface MyCallable<T> { T call(); }

下面是模板代码:

protected <R> Observable<R> createObservable(final MyCallable<R> callable) { return Observable.create(new Observable.OnSubscribe<R>() { @Override public void call(Subscriber<? super R> subscriber) { try { R result = callable.call(); subscriber.onNext(result); } catch (Exception e) { subscriber.onError(e); } subscriber.onCompleted(); } }); }

使用就非常简单了调用createObservable方法,实现MyCallable接口便可,然后就是跟使用RxJava1样处理逻辑。

分析greendao是如何支持RxJava风格的

看过Greendao源码的人知道,它也是通过这类方式支持RxJava的(下面看看他是怎样做的):

/** * Rx version of {@link AbstractDao#loadAll()} returning an Observable. */ @Experimental public Observable<T> load(final K key) { return wrap(new Callable<T>() { @Override public T call() throws Exception { return dao.load(key); } }); }

终究的实现也是通过dao.load(key)同步方法来实现的,关键是wrap方法了:

protected <R> Observable<R> wrap(Callable<R> callable) { return wrap(RxUtils.fromCallable(callable)); } //通过这个方法再包装了1层(就是默许设置履行的线程) protected <R> Observable<R> wrap(Observable<R> observable) { if (scheduler != null) { return observable.subscribeOn(scheduler); } else { return observable; } }

通过代码可以看到默许履行的线程是IO线程:

/** * The returned RxDao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for * subscribeOn. * * @see #rxPlain() */ @Experimental public RxDao<T, K> rx() { if (rxDao == null) { rxDao = new RxDao<>(this, Schedulers.io()); } return rxDao; }

所以使用greendao不用指定它在IO履行,由于sdk已帮我们设置了。

然后就是RxUtils.fromCallable(callable)方法了:

class RxUtils { /** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */ @Internal static <T> Observable<T> fromCallable(final Callable<T> callable) { return Observable.defer(new Func0<Observable<T>>() { @Override public Observable<T> call() { T result; try { result = callable.call(); } catch (Exception e) { return Observable.error(e); } return Observable.just(result); } }); } }

上面的注释说通过Observable.fromCallable也能够实现这样的逻辑,也就是说代替Observable.defer()方法。
最后greendao是通过defer操作符来实现rx风格的。

defer和create操作符有甚么异同点?

通过分析greendao源码得知,他是通过defer来做的,我们是通过create操作符来做的。那二者有甚么不同?

我们对defer操作符比较陌生,先看看它的源码:

public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) { return create(new OnSubscribeDefer<T>(observableFactory)); }

说白了就是调用了create(OnSubscribe<T> f) 方法:

public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }

其实我们上面的create操作也是调用过来这个方法。只是defer操作符传递的OnSubscribe是OnSubscribeDefer,那我们来看看这是甚么鬼?

public final class OnSubscribeDefer<T> implements OnSubscribe<T> { final Func0<? extends Observable<? extends T>> observableFactory; public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) { this.observableFactory = observableFactory; } @Override public void call(final Subscriber<? super T> s) { Observable<? extends T> o; try { o = observableFactory.call(); } catch (Throwable t) { Exceptions.throwOrReport(t, s); return; } o.unsafeSubscribe(Subscribers.wrap(s)); } }

OnSubscribeDefer也是继承自OnSubscribe,那末他的call方法肯定也是在定阅的时候被调用(就是说定阅的时候才创建这个observable,并且每次定阅都会创建1个新的observable)。
为何Greendao没有使用create那种方式精确控制数据的发射?现在RxJava2.0对create操作符做出了1些限制,不能马马虎虎create了,这样出现1些问题。具体的rxJav2.0的改动可以看看
他的github说明What’s-different-in⑵.0

关于RxJava的1些参考资料:
pitfalls-of-operator-implementations
subscribe vs unsafeSubscribe
What’s-different-in⑵.0

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐