现在愈来愈多Android开发者使用到RxJava,在Android使用RxJava主要有以下好处:
1,轻松切换线程。之前我们切换线程主要使用Handler等手段来做。
2,轻松解决回调的嵌套问题。现在的app业务逻辑愈来愈复杂,多的时候3,4层回调嵌套,使得代码可保护性变得很差。RxJava链式调用使得这些调用变得扁平化。
随着RxJava的流行,愈来愈多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。
但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是1些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。
如果还是想之前那样做,那就太low。
遇到这类问题,在我脑海里显现的第1种方式就是通过Observable的create操作符。由于在里面我们可以控制数据的发射。就像上1篇文章那样《RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断》
以下代码片断:
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
List<Article> as = articleDao.queryBuilder()
.where(ArticleDao.Properties.CategoryId.eq(categoryId))
.orderDesc(ArticleDao.Properties.Id)
.offset((pageIndex - 1) * pageSize)
.limit(pageSize).list();
if (as == null || as.isEmpty()) {
subscriber.onNext(null);
}else{
subscriber.onNext(as);
}
}catch (Exception e){
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
这样确切没有无问题。但是我们要封装下, 每一个方法都这样写保护性和扩大比较差(例如有天我想换种方式来实现而不是create,如果通过方法封装1下,修改就变得容易多了)
如何封装呢?通过分析知道,大部份代码是相同的,只是我们的业务不1样。那末通过模板方法解决吧。业务方法通过接口回调的方式传递进来,由于我们不知道调用者是甚么业务。
回调接口以下(T表示我们业务数据):
public interface MyCallable<T> {
T call();
}
下面是模板代码:
protected <R> Observable<R> createObservable(final MyCallable<R> callable) {
return Observable.create(new Observable.OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> subscriber) {
try {
R result = callable.call();
subscriber.onNext(result);
} catch (Exception e) {
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
使用就非常简单了调用createObservable方法,实现MyCallable接口便可,然后就是跟使用RxJava1样处理逻辑。
看过Greendao源码的人知道,它也是通过这类方式支持RxJava的(下面看看他是怎样做的):
/**
* Rx version of {@link AbstractDao#loadAll()} returning an Observable.
*/
@Experimental
public Observable<T> load(final K key) {
return wrap(new Callable<T>() {
@Override
public T call() throws Exception {
return dao.load(key);
}
});
}
终究的实现也是通过dao.load(key)同步方法来实现的,关键是wrap方法了:
protected <R> Observable<R> wrap(Callable<R> callable) {
return wrap(RxUtils.fromCallable(callable));
}
//通过这个方法再包装了1层(就是默许设置履行的线程)
protected <R> Observable<R> wrap(Observable<R> observable) {
if (scheduler != null) {
return observable.subscribeOn(scheduler);
} else {
return observable;
}
}
通过代码可以看到默许履行的线程是IO线程:
/**
* The returned RxDao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for
* subscribeOn.
*
* @see #rxPlain()
*/
@Experimental
public RxDao<T, K> rx() {
if (rxDao == null) {
rxDao = new RxDao<>(this, Schedulers.io());
}
return rxDao;
}
所以使用greendao不用指定它在IO履行,由于sdk已帮我们设置了。
然后就是RxUtils.fromCallable(callable)
方法了:
class RxUtils {
/** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */
@Internal
static <T> Observable<T> fromCallable(final Callable<T> callable) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
T result;
try {
result = callable.call();
} catch (Exception e) {
return Observable.error(e);
}
return Observable.just(result);
}
});
}
}
上面的注释说通过Observable.fromCallable也能够实现这样的逻辑,也就是说代替Observable.defer()方法。
最后greendao是通过defer操作符来实现rx风格的。
通过分析greendao源码得知,他是通过defer来做的,我们是通过create操作符来做的。那二者有甚么不同?
我们对defer操作符比较陌生,先看看它的源码:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
return create(new OnSubscribeDefer<T>(observableFactory));
}
说白了就是调用了create(OnSubscribe<T> f)
方法:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
其实我们上面的create操作也是调用过来这个方法。只是defer操作符传递的OnSubscribe是OnSubscribeDefer,那我们来看看这是甚么鬼?
public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
final Func0<? extends Observable<? extends T>> observableFactory;
public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}
@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
o.unsafeSubscribe(Subscribers.wrap(s));
}
}
OnSubscribeDefer也是继承自OnSubscribe,那末他的call方法肯定也是在定阅的时候被调用(就是说定阅的时候才创建这个observable,并且每次定阅都会创建1个新的observable)。
为何Greendao没有使用create那种方式精确控制数据的发射?现在RxJava2.0对create操作符做出了1些限制,不能马马虎虎create了,这样出现1些问题。具体的rxJav2.0的改动可以看看
他的github说明What’s-different-in⑵.0
关于RxJava的1些参考资料:
pitfalls-of-operator-implementations
subscribe vs unsafeSubscribe
What’s-different-in⑵.0