Rxjava 系列目录
Rxjava3
博客创建时间:2021.04.11
博客更新时间:2021.04.15
以Android studio build=4.1.3,gradle=6.5,SdkVersion 30来分析讲解。如图文和网上其他资料不一致,可能是别的资料版本较低而已
前言
依赖配置
implementation "io.reactivex.rxjava3:rxjava:3.0.0"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
Rxjava特点:
- Rxjava使用观察者模式,响应式编程。
- 简单实现异步操作,两行代码实现线程切换
Rxjava与RxAndroid
以rxjava2:rxandroid:2.1.1来分析,RxAndroid中只有4个class文件
观察者模式
Android 中的观察者模式,Rxjava中有两个重要的类Observable和Observer,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable)。
通过创建Observable发射数据流,经过一系列操作符(Operators)加工处理和线程调度器(Scheduler)在不同线程间的转发,最后由观察者接受并做出响应的一个过程。
下面我们以一个点击事件来类比一个观察者模式的使用实例:
// 1. 创建被观察者
val btnTestRxJava=findViewById<View>(R.id.btnTestRxJava)
// 2. 创建被观察者并订阅
btnTestRxJava.setOnClickListener(this)
override fun onClick(v: View?) {
// 3. 被观察者变动,获得消息变动进行后续操作
if (v.id == R.id.btnTestRxJava) {
Toast.makeText(this, "按钮点了", Toast.LENGTH_SHORT).show()
}
}
流程很简单,分3步走:
- 创建被观察者。Button btnTestRxJava
- 创建被观察者并订阅。观察者是Activity
- 被观察者变动,获得消息变动进行后续操作。 当点击btnTestRxJava后Activity监听到事件进行后续操作
观察者和被观察者
观察者
有两种Observer,Subscriber。
Subscriber是对Observer的一种扩展,内部增加了OnStart方法,在事件未发送之前订阅,用于做一些准备工作,并且还有unsubscribe()用于取消订阅。
Observer的内部实现很简单,如想进一步阅读,请查看我的博客
《Rxjava源码分析之IO.Reactivex.Observer》
public interface Observer<@NonNull T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
被观察者
Observable有很多创建方式,如如create()、defer()、just()、from()、rang()、timer()、interval()等。
最常用的创建方式是create(),其参数是ObservableOnSubscribe,需要重写void subscribe(@NonNull ObservableEmitter emitter)方法。
val mObservable: Observable<String> = Observable.create {
it?.let {
this.emitter=it
it.onNext("Hello, world!")
it.onComplete()
}
}
subscribe()方法中的额程序只会在Observable被订阅时进行响应回调,它的响应在Observer#onSubscribe(Disposable d)方法之后。
使用案例
private fun simpleMode(){
val mObserver: Observer<String> = object : Observer<String> {
override fun onSubscribe(d: @NonNull Disposable?) {}
override fun onNext(s: @NonNull String) {}
override fun onError(e: @NonNull Throwable?) {}
override fun onComplete() {}
}
mObservable.subscribe(mObserver)
}
对于多次订阅的执行,ObservableOnSubscribe#subscribe(ObservableEmitter emitter)中的实例emitter会新生成一个,且Observer#onSubscribe(disposable: Disposable) 中的disposable也会重新生成新的。
mObservable.subscribe(mObserver)
mObservable.subscribe(mObserver)
mObservable.subscribe(mObserver)
不要对同一观察者和同一被观察者不要重复订阅
响应流程
Observable的状态改变后,数据是由ObservableEmitter通过onNext、onComplete、onError方法发射的,订阅了的Observer会响应对应的onNext、onComplete、onError方法。如执行方法:
mEmitter.onNext("Hello");
mEmitter.onComplete();
方法的响应顺序为
- ObservableEmitter.onNext
- Observer.onNext
- ObservableEmitter.onComplete
- Observer.onComplete
假设多次重复执行,
Observer.onNext 、 Observer.onComplete
是不会响应的。验证了Observer对同一状态改变只会响应一次。
mEmitter.onNext("Hello");
mEmitter.onComplete();
mEmitter.onNext("Hello");
mEmitter.onComplete();
mEmitter.onNext("Hello");
mEmitter.onComplete();
总结
本测试Demo源码
gitee:
https://gitee.com/luofaxin/RxJava3Analysis.git
github:
https://github.com/l424533553/RxJava3Analysis.git
相关链接
:
扩展链接:
-
Rxjava源码分析之IO.Reactivex.Observer
-
Rxjava源码分析之IO.Reactivex.CompositeDisposable
-
Rxjava源码分析之IO.Reactivex.Observable
扩展训练:
- 观察者模式的基本定义
- Rxjava的基本使用示例
- Rxjava的调用流程
博客书写不易,您的点赞收藏是我前进的动力,千万别忘记点赞、 收藏 ^ _ ^ !