RxJava3
简介
RxJava3是Android的一个响应式编程框架。
Android的异步操作是用AsyncTask和Handler,但是当请求多了以后,代码的逻辑将会变得十分的复杂,这时候使用RxJava就可以让整个代码的逻辑变得清晰。
RxJava是基于事件流的链式调用,将链式编程风格和异步结合在一起。RxJava的原理其实就是:先创建一个
被观察者Observable
对象,然后使用各式各样的操作符建立链式操作,通过
订阅Subscribe
按顺序发送(一步步像流水线那样)事件给
观察者Observer
,
观察者Observer
按顺序接收事件并对事件作出相应的响应。
GitHub RxJava
RxJava2从2021年2月28日起就不会发生进一步的开发、支持、维护、更新等,但是RxJava2的最新版本2.2.21还是可以访问的。所以本文基于
RxJava3
(没学过1和2也可以直接学3啦)。
一些概念
观察者模式
因为RxJava的异步操作是通过扩展的
观察者模式
来实现的,所以需要先大致了解学习一下观察者模式。
观察者模式在GoF的《设计模式》—书中,它的定义是这样的:
Define a one-to-many dependency between objects so that when one
objectchanges state, all its dependents are notified and updated
automatically.
就是说在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。
一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。
其实就像我们追剧那样,电视剧可以当作
被观察者Observable
,我们可以当作
观察者Observer
,我们去订阅了电视剧,当电视剧出现新的更新的时候,会推送给我们,我们就不需要时时刻刻盯着电视剧看有没有更新,而是电视剧更新了会主动推送给我们。
背压Backpressure
在RxJava的使用中,由于RxJava的架构是观察者模式,被观察者Observable和观察者Observer的产生事件和处理事件的速度可能会有所差异,这样也许就会导致程序崩溃。
举个例子,你在看电视剧,你看电视剧的速度很快,但是电视剧更新的速度很慢,那你等电视剧更新了再看,这样其实是可以的。但是如果电视剧更新的速度很快,而你看电视剧的速度很慢,这样电视剧一集一集的就会累加起来,到最后,你就会积累了一堆没看的电视剧,你积压了电视剧也许不会崩溃,但是如果程序堆积了很多事件,最终就会挤爆内存,导致程序崩溃。
那怎么办呢?可以解决这种问题吗?
在上面的例子里面可以看见,电视剧(被观察者)是主动的推送数据给我们(观察者),我们是被动接收的。
所以背压其实就是在异步中,被观察者发送事件速度远快于观察者的处理速度的情况下,可以采取的一种控制流速的策略,就是告诉上游的被被观察者降低发送速度。
响应式拉取就是一种背压策略,可以让观察者可以根据自身实际情况去被观察者那里拉取数据,而不是被动接收被观察者发送的数据,就是告诉被观察者,慢一点慢一点,我电视剧看不完啦!可以让被观察者变成被动式,等待观察者的通知再发送数据。这样就实现了控制上游被观察者发送事件的速度,这就是背压,一种策略。
观察者模式种类
在RxJava3中的被观察者Observable和观察者Observer大致有:
● Observable(被观察者)/Observer(观察者)(Observeable用于订阅Observer)
Observable:发送0个或N个数据,不支持背压(所以使用的时候需要考虑数据量是否很大)。(原本是支持的,RxJava2.x后由Flowable支持,因此改成不支持背压)
● Flowable(被观察者)/Subscriber(观察者)(Flowable用于订阅Subscriber)
Flowable:发送0个或N个数据,支持背压。是RxJava2.x后的新类型
● Single(被观察者)/SingleObserver(观察者)
Single:只处理onSuccess和onError事件,只能发送单个数据或者发送一个错误
● Completable(被观察者)/CompletableObserver(观察者)
Completable:创建后不会发射任何数据,只处理onComplete和onError事件
● Maybe(被观察者)/MaybeObserve(观察者)
Maybe:能够发射0个或1个数据
本文只介绍了Observable(被观察者)/Observer(观察者)这一种类的基础使用
最基础的使用
配置
在build.gradle(:app)里添加依赖
rxjava是RxJava本体,rxandroid是RxJava在Android平台的扩展
创建被观察者Observable
//创建被观察者Observable(电视剧)
//使用create操作符从头开始创建一个Observable
Observable<String> TVplay = Observable.create(new ObservableOnSubscribe<String>() {//ObservableOnSubscribe<T>其实就像一个队列一样
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {//ObservableEmitter可以理解为事件发射器
//重写subscribe()定义需要发送的事件
//ObservableEmitter有三种发射方法,onNext()、onComplete()、onError()
//onNext()发送事件,可以无限调用,所有的发射信息观察者都可以收得到
emitter.onNext("电视剧第一集");
emitter.onNext("电视剧第二集");
emitter.onNext("电视剧第三集");
//onComplete()、onError()是互斥的,观察者只会收到一个
//onComplete()完成事件发送,可以重复调用,但是观察者只会接收一次
emitter.onComplete();
//onError()异常发送,不可以重复调用
//emitter.onError(new IllegalArgumentException("TVplay Exception!"));
}
});
//RxJava还提供了其他方法用于创建被观察者对象Observable
//just(T...):直接将传入的参数依次发送出来(将一个或多个对象转换成发射这个或这些对象的一个Observable)
// Observable<String> observable = Observable.just("电视剧第一集", "电视剧第二集", "电视剧第三集");//这个跟上面的作用是一样的
//还有很多,比如from等...create、just、from其实就是RxJava的操作符,可以期待本人的RxJava操作符文章
值得注意的是,在创建Observable的时候,需要选择rxjava3的,不要选成util包的,这是安卓自带的观察者模式。
创建观察者Observer
//创建观察者Observer(观影者)
Observer<String> viewer = new Observer<String>() {
//定义Disposable类变量
//private Disposable mDisposable;
//事件队列开始
@Override
public void onSubscribe(@NonNull Disposable d) {//Disposable可以当作订阅关系
//当订阅时被调用
Log.e("RxJava_TAG", "onSubscribe");
//取消/解除订阅
//d.dispose();
//是否已经解除订阅
//d.isDisposed();
//可以这样操作
//mDisposable = d;
}
//普通事件
@Override
public void onNext(@NonNull String s) {
//发送数据时被多次调用
Log.e("RxJava_TAG", "onNext" + s);
//取消/解除订阅实例
//if ("电视剧第二集".equals(s)) {
// //设置在接收到"电视剧第三集"事件后切断观察者和被观察者的连接
// mDisposable.dispose();
// Log.d("RxJava_TAG", "是否已经切断了连接:" + mDisposable.isDisposed());
//}
}
//事件队列异常事件
@Override
public void onError(@NonNull Throwable e) {
//在事件处理过程中出现异常时,onError()方法会被触发
Log.e("RxJava_TAG", "onError");
}
//事件队列完结事件
@Override
public void onComplete() {
//RxJava把所有事件都当作队列处理
//当被观察者不再发送onNext()普通事件的时候
Log.e("RxJava_TAG", "onComplete");
}
};
订阅Subscribe
//订阅Subscribe(关系绑定)
//当被观察者Observable被订阅时,即事件序列就会依照设定依次被触发
//即观察者Observer会依次调用对应事件的复写方法从而响应事件
//从而实现被观察者Observable调用了观察者Observer的回调方法(或者说被观察者向观察者的事件传递)
TVplay.subscribe(viewer);
番外:这个订阅是被观察者订阅观察者,其实是因为RxJava链式编程的缘故,其实就是被观察者被观察者订阅啦。
运行结果
可以看见先调用了onSubscribe(),再调用三个onNext(),最后调用了onComplete()
在创建观察者的时候,在onSubscribe()方法可以看见Disposable(如下图蓝标代码),可以把Disposable当作订阅关系
把图中标红的代码放开得到的运行结果为:
基于事件流的链式调用(简洁使用)
上面介绍的其实是最基本的写法,只是为了教学才写成那样的,一般在项目中,都采用通过链式调用
就是把创建被观察者、创建观察者、订阅这三个步骤采用链式调用写在一起
//更简洁的方法 基于事件流的链式调用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("电影第一部");
emitter.onNext("电影第二部");
emitter.onNext("电影第三部");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJava_TAG", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("RxJava_TAG", "onNext" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJava_TAG", "onError");
}
@Override
public void onComplete() {
Log.e("RxJava_TAG", "onComplete");
}
});
}
运行结果:
全部代码
public class RxJavaActivity extends AppCompatActivity {
public static void start(Context context) {
Intent starter = new Intent(context, RxJavaActivity.class);
context.startActivity(starter);
}
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
//订阅Subscribe(关系绑定)
//当被观察者Observable被订阅时,即事件序列就会依照设定依次被触发
//即观察者Observer会依次调用对应事件的复写方法从而响应事件
//从而实现被观察者Observable调用了观察者Observer的回调方法(或者说被观察者向观察者的事件传递)
TVplay.subscribe(viewer);
//更简洁的方法 基于事件流的链式调用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("电影第一部");
emitter.onNext("电影第二部");
emitter.onNext("电影第三部");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJava_TAG", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("RxJava_TAG", "onNext" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJava_TAG", "onError");
}
@Override
public void onComplete() {
Log.e("RxJava_TAG", "onComplete");
}
});
}
//创建被观察者Observable(电视剧)
//使用create操作符从头开始创建一个Observable
Observable<String> TVplay = Observable.create(new ObservableOnSubscribe<String>() {//ObservableOnSubscribe<T>其实就像一个队列一样
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {//ObservableEmitter可以理解为事件发射器
//重写subscribe()定义需要发送的事件
//ObservableEmitter有三种发射方法,onNext()、onComplete()、onError()
//onNext()发送事件,可以无限调用,所有的发射信息观察者都可以收得到
emitter.onNext("电视剧第一集");
emitter.onNext("电视剧第二集");
emitter.onNext("电视剧第三集");
//onComplete()、onError()是互斥的,观察者只会收到一个
//onComplete()完成事件发送,可以重复调用,但是观察者只会接收一次
emitter.onComplete();
//onError()异常发送,不可以重复调用
//emitter.onError(new IllegalArgumentException("TVplay Exception!"));
}
});
//RxJava还提供了其他方法用于创建被观察者对象Observable
//just(T...):直接将传入的参数依次发送出来(将一个或多个对象转换成发射这个或这些对象的一个Observable)
// Observable<String> observable = Observable.just("电视剧第一集", "电视剧第二集", "电视剧第三集");//这个跟上面的作用是一样的
//还有很多,比如from等...
//创建观察者Observer(观影者)
Observer<String> viewer = new Observer<String>() {
//定义Disposable类变量
//private Disposable mDisposable;
//事件队列开始
@Override
public void onSubscribe(@NonNull Disposable d) {//Disposable可以当作订阅关系
//当订阅时被调用
Log.e("RxJava_TAG", "onSubscribe");
//取消/解除订阅
//d.dispose();
//是否已经解除订阅
//d.isDisposed();
//可以这样操作
//mDisposable = d;
}
//普通事件
@Override
public void onNext(@NonNull String s) {
//发送数据时被多次调用
Log.e("RxJava_TAG", "onNext" + s);
//取消/解除订阅实例
//if ("电视剧第二集".equals(s)) {
// //设置在接收到"电视剧第三集"事件后切断观察者和被观察者的连接
// mDisposable.dispose();
// Log.d("RxJava_TAG", "是否已经切断了连接:" + mDisposable.isDisposed());
//}
}
//事件队列异常事件
@Override
public void onError(@NonNull Throwable e) {
//在事件处理过程中出现异常时,onError()方法会被触发
Log.e("RxJava_TAG", "onError");
}
//事件队列完结事件
@Override
public void onComplete() {
//RxJava把所有事件都当作队列处理
//当被观察者不再发送onNext()普通事件的时候
Log.e("RxJava_TAG", "onComplete");
}
};
}