在说Observer与Subscriber的关系之前,我们下重温下相干概念。
RxJava 有4个基本概念:Observable (可视察者,即被视察者)、 Observer (视察者)、 subscribe (定阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现定阅关系,从而 Observable 可以在需要的时候发失事件来通知 Observer。
与传统视察者模式不同, RxJava 的事件回调方法除普通事件 onNext() (相当于 onClick() / onEvent())以外,还定义了两个特殊的事件:onCompleted() 和 onError()。
RxJava 的视察者模式大致以下图:
基于以上的概念, RxJava 的基本实现主要有3点:
1) 创建 Observer
Observer 即视察者,它决定事件触发的时候将有怎样的行动。 RxJava 中的 Observer 接口的实现方式:
Observer<Apps> observer = new Observer<Apps>() {
@Override
public void onCompleted() {
listView.onRefreshComplete();
}
@Override
public void onError(Throwable e) {
listView.onRefreshComplete();
}
@Override
public void onNext(Apps appsList) {
listView.onRefreshComplete();
appLists.addAll(appsList.apps);
adapter.notifyDataSetChanged();
}
};
除 Observer 接口以外,RxJava 还内置了1个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了1些扩大,但他们的基本使用方式是完全1样的:
Subscriber subscriber = new Subscriber<Apps>() {
@Override
public void onCompleted() {
listView.onRefreshComplete();
}
@Override
public void onError(Throwable e) {
listView.onRefreshComplete();
}
@Override
public void onNext(Apps appsList) {
listView.onRefreshComplete();
appLists.addAll(appsList.apps);
adapter.notifyDataSetChanged();
}
};
不但基本使用方式1样,实质上,在 RxJava 的 subscribe 进程中,Observer 也总是会先被转换成1个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全1样的。
public abstract class Subscriber<T> implements Observer<T>, Subscription
而onStart()方法是Subscriber中的1个方法。它也属于回调级别的。
subscribe(Subscriber)方法中有以下代码:
// if not already wrapped 包裹1层
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
他将subscriber包装起来,这个具体甚么意思有待研究,继续下看。
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
hook是甚么呢?
private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaObservableExecutionHook.java源码:
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
/**
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a
* default no-op implementation.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
* <p>
* <b>Note on thread-safety and performance:</b>
* <p>
* A single implementation of this class will be used globally so methods on this class will be invoked
* concurrently from multiple threads so all functionality must be thread-safe.
* <p>
* Methods are also invoked synchronously and will add to execution time of the observable so all behavior
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
* worker threads.
*
*/
public abstract class RxJavaObservableExecutionHook {
/**
* Invoked during the construction by {@link Observable#create(OnSubscribe)}
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass-thru the function.
*
* @param f
* original {@link OnSubscribe}<{@code T}> to be executed
* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
/**
* Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed.
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass-thru the function.
*
* @param onSubscribe
* original {@link OnSubscribe}<{@code T}> to be executed
* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
/**
* Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned
* {@link Subscription}.
* <p>
* This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,
* metrics and other such things and pass-thru the subscription.
*
* @param subscription
* original {@link Subscription}
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
* pass-thru
*/
public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass-thru by default
return subscription;
}
/**
* Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable.
* <p>
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
*
* @param e
* Throwable thrown by {@link Observable#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass-thru
*/
public <T> Throwable onSubscribeError(Throwable e) {
// pass-thru by default
return e;
}
/**
* Invoked just as the operator functions is called to bind two operations together into a new
* {@link Observable} and the return value is used as the lifted function
* <p>
* This can be used to decorate or replace the {@link Operator} instance or just perform extra
* logging, metrics and other such things and pass-thru the onSubscribe.
*
* @param lift
* original {@link Operator}{@code <R, T>}
* @return {@link Operator}{@code <R, T>} function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
return lift;
}
}
RxJavaObservableExecutionHook类的作用很特殊,仿佛没有甚么太大的作用,传进去甚么(类型)参数,返回甚么(类型)参数。
以下代码所示:
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
至于最后关键的返回结果:
public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass-thru by default
return subscription;
}
说白了,就是返回定阅的Observer对象。
它们的区分对使用者来讲主要有两点:
- onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做1些准备工作,例如数据的清零或重置。这是1个可选方法,默许情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出1个显示进度的对话框,这必须在主线程履行), onStart() 就不适用了,由于它总是在 subscribe 所产生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
- unsubscribe(): 这是 Subscriber 所实现的另外一个接口 Subscription 的方法,用于取消定阅。在这个方法被调用后,Subscriber 将不再接收事件。1般在这个方法调用前,可使用 isUnsubscribed() 先判断1下状态。 unsubscribe() 这个方法很重要,由于在 subscribe() 以后, Observable 会持有 Subscriber 的援用,这个援用如果不能及时被释放,将有内存泄漏的风险。所以最好保持1个原则:要在不再使用的时候尽快在适合的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来消除援用关系,以免内存泄漏的产生。
2) 创建 Observable
Observable 即被视察者,它决定甚么时候触发事件和触发怎样的事件。 RxJava 使用 create() 方法来创建1个 Observable ,并为它定义事件触发规则:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("John");
subscriber.onCompleted();
}
});
可以看到,这里传入了1个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于1个计划表,当 Observable 被定阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定顺次触发(对上面的代码,就是视察者Subscriber 将会被调用两次 onNext() 和1次 onCompleted())。这样,由被视察者调用了视察者的回调方法,就实现了由被视察者向视察者的事件传递,即视察者模式。
create() 是 RxJava 最基本的创造事件序列的操作符。基于这个操作符, RxJava 还提供了1些方法用来快捷创建事件队列,详见RxJava操作符系列文章:http://blog.csdn.net/jdsjlzx/article/details/51485861
3) Subscribe (定阅)
创建了 Observable 和 Observer 以后,再用 subscribe() 方法将它们联结起来,整条链子就能够工作了。代码情势很简单:
observable.subscribe(observer);
// 或:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
可以看到,subscriber() 做了3件事:
全部进程中对象间的关系以下图:
除 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完全定义的回调,RxJava 会自动根据定义创建出 Subscriber 。情势以下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 进程中终究会被转换成 Subscriber 对象,因此,从某种程度上来讲用 Subscriber 来代替 Observer ,这样会更加严谨。
根据目前的经验来看,Observer与Subscriber的主要区分在于onNext方法履行终了后是不是取消了定阅。
首先,我们分别定义mSubscriber 和 mObserver 。
以下代码:
protected Subscriber<D> mSubscriber = new Subscriber<D>() {
@Override
public void onCompleted() {
executeOnLoadFinish();
}
@Override
public void onError(Throwable e) {
TLog.error("onError " + e.toString());
executeOnLoadDataError(null);
}
@Override
public void onNext(D d) {
TLog.log("onNext " );
List<T> list = d;
TLog.log("entity " + list.size());
executeOnLoadDataSuccess(list);
TLog.log("onSuccess totalPage " + totalPage);
}
};
protected Observer<D> mObserver = new Observer<D>() {
@Override
public void onCompleted() {
executeOnLoadFinish();
}
@Override
public void onError(Throwable e) {
TLog.error("onError " + e.toString());
executeOnLoadDataError(null);
}
@Override
public void onNext(D d) {
TLog.log("onNext " );
List<T> list = d;
TLog.log("entity " + list.size());
executeOnLoadDataSuccess(list);
TLog.log("onSuccess totalPage " + totalPage);
}
};
observable.subscribeOn(Schedulers.io())
.map(new Func1<Response<D>,D>() {
@Override
public D call(Response<D> response) {
if(response == null){
throw new ApiException(100);
}
totalPage = response.total;
return response.result;
}
})
.observeOn(AndroidSchedulers.mainThread())
//.subscribe(mObserver);
.subscribe(mSubscriber);
subscribe(mObserver)和subscribe(mSubscriber)履行结果就会有区分:
提示:个人以为subscribe(mObserver)这个方式更合适分页加载。
请注意,如果你每次都使用subscribe(new Subscriber< T>() {})方式实现定阅,就不会出现上面的问题。
以下代码:
private void toSubscribe(Observable<Response<D>> observable) {
observable.subscribeOn(Schedulers.io())
.map(new Func1<Response<D>,D>() {
@Override
public D call(Response<D> response) {
if(response == null){
throw new ApiException(100);
}
totalPage = response.total;
return response.result;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<D>() {
@Override
public void onCompleted() {
executeOnLoadFinish();
}
@Override
public void onError(Throwable e) {
TLog.error("onError " + e.toString());
executeOnLoadDataError(null);
}
@Override
public void onNext(D d) {
TLog.log("onNext " );
List<T> list = d;
TLog.log("entity " + list.size());
executeOnLoadDataSuccess(list);
TLog.log("onSuccess totalPage " + totalPage);
}
});
}
固然,这个方式实现分页加载也是可以的。至于哪一个更好,还需要再验证。