Android RxJava3入门

  • Post author:
  • Post category:java




简介

RxJava3是Android的一个响应式编程框架。

Android的异步操作是用AsyncTask和Handler,但是当请求多了以后,代码的逻辑将会变得十分的复杂,这时候使用RxJava就可以让整个代码的逻辑变得清晰。

RxJava是基于事件流的链式调用,将链式编程风格和异步结合在一起。RxJava的原理其实就是:先创建一个

被观察者Observable

对象,然后使用各式各样的操作符建立链式操作,通过

订阅Subscribe

按顺序发送(一步步像流水线那样)事件给

观察者Observer



观察者Observer

按顺序接收事件并对事件作出相应的响应。


GitHub RxJava


RxJava2从2021年2月28日起就不会发生进一步的开发、支持、维护、更新等,但是RxJava2的最新版本2.2.21还是可以访问的。所以本文基于

RxJava3

(没学过1和2也可以直接学3啦)。



一些概念



观察者模式

因为RxJava的异步操作是通过扩展的

观察者模式

来实现的,所以需要先大致了解学习一下观察者模式。

观察者模式在GoF的《设计模式》—书中,它的定义是这样的:

Define a one-to-many dependency between objects so that when one

objectchanges state, all its dependents are notified and updated

automatically.

就是说在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。

其实就像我们追剧那样,电视剧可以当作

被观察者Observable

,我们可以当作

观察者Observer

,我们去订阅了电视剧,当电视剧出现新的更新的时候,会推送给我们,我们就不需要时时刻刻盯着电视剧看有没有更新,而是电视剧更新了会主动推送给我们。



背压Backpressure

在RxJava的使用中,由于RxJava的架构是观察者模式,被观察者Observable和观察者Observer的产生事件和处理事件的速度可能会有所差异,这样也许就会导致程序崩溃。

举个例子,你在看电视剧,你看电视剧的速度很快,但是电视剧更新的速度很慢,那你等电视剧更新了再看,这样其实是可以的。但是如果电视剧更新的速度很快,而你看电视剧的速度很慢,这样电视剧一集一集的就会累加起来,到最后,你就会积累了一堆没看的电视剧,你积压了电视剧也许不会崩溃,但是如果程序堆积了很多事件,最终就会挤爆内存,导致程序崩溃。

那怎么办呢?可以解决这种问题吗?

在上面的例子里面可以看见,电视剧(被观察者)是主动的推送数据给我们(观察者),我们是被动接收的。

所以背压其实就是在异步中,被观察者发送事件速度远快于观察者的处理速度的情况下,可以采取的一种控制流速的策略,就是告诉上游的被被观察者降低发送速度。

响应式拉取就是一种背压策略,可以让观察者可以根据自身实际情况去被观察者那里拉取数据,而不是被动接收被观察者发送的数据,就是告诉被观察者,慢一点慢一点,我电视剧看不完啦!可以让被观察者变成被动式,等待观察者的通知再发送数据。这样就实现了控制上游被观察者发送事件的速度,这就是背压,一种策略。



观察者模式种类

在RxJava3中的被观察者Observable和观察者Observer大致有:

● Observable(被观察者)/Observer(观察者)(Observeable用于订阅Observer)

Observable:发送0个或N个数据,不支持背压(所以使用的时候需要考虑数据量是否很大)。(原本是支持的,RxJava2.x后由Flowable支持,因此改成不支持背压)

● Flowable(被观察者)/Subscriber(观察者)(Flowable用于订阅Subscriber)

Flowable:发送0个或N个数据,支持背压。是RxJava2.x后的新类型

● Single(被观察者)/SingleObserver(观察者)

Single:只处理onSuccess和onError事件,只能发送单个数据或者发送一个错误

● Completable(被观察者)/CompletableObserver(观察者)

Completable:创建后不会发射任何数据,只处理onComplete和onError事件

● Maybe(被观察者)/MaybeObserve(观察者)

Maybe:能够发射0个或1个数据

本文只介绍了Observable(被观察者)/Observer(观察者)这一种类的基础使用



最基础的使用



配置

在build.gradle(:app)里添加依赖

在这里插入图片描述

rxjava是RxJava本体,rxandroid是RxJava在Android平台的扩展



创建被观察者Observable

    //创建被观察者Observable(电视剧)
    //使用create操作符从头开始创建一个Observable
    Observable<String> TVplay = Observable.create(new ObservableOnSubscribe<String>() {//ObservableOnSubscribe<T>其实就像一个队列一样
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {//ObservableEmitter可以理解为事件发射器
            //重写subscribe()定义需要发送的事件
            //ObservableEmitter有三种发射方法,onNext()、onComplete()、onError()
            //onNext()发送事件,可以无限调用,所有的发射信息观察者都可以收得到
            emitter.onNext("电视剧第一集");
            emitter.onNext("电视剧第二集");
            emitter.onNext("电视剧第三集");
            //onComplete()、onError()是互斥的,观察者只会收到一个
            //onComplete()完成事件发送,可以重复调用,但是观察者只会接收一次
            emitter.onComplete();
            //onError()异常发送,不可以重复调用
            //emitter.onError(new IllegalArgumentException("TVplay Exception!"));
        }
    });
    //RxJava还提供了其他方法用于创建被观察者对象Observable
    //just(T...):直接将传入的参数依次发送出来(将一个或多个对象转换成发射这个或这些对象的一个Observable)
    //    Observable<String> observable = Observable.just("电视剧第一集", "电视剧第二集", "电视剧第三集");//这个跟上面的作用是一样的
    //还有很多,比如from等...create、just、from其实就是RxJava的操作符,可以期待本人的RxJava操作符文章

值得注意的是,在创建Observable的时候,需要选择rxjava3的,不要选成util包的,这是安卓自带的观察者模式。

在这里插入图片描述



创建观察者Observer

    //创建观察者Observer(观影者)
    Observer<String> viewer = new Observer<String>() {
        //定义Disposable类变量
        //private Disposable mDisposable;

        //事件队列开始
        @Override
        public void onSubscribe(@NonNull Disposable d) {//Disposable可以当作订阅关系
            //当订阅时被调用
            Log.e("RxJava_TAG", "onSubscribe");
            //取消/解除订阅
            //d.dispose();
            //是否已经解除订阅
            //d.isDisposed();
            //可以这样操作
            //mDisposable = d;
        }

        //普通事件
        @Override
        public void onNext(@NonNull String s) {
            //发送数据时被多次调用
            Log.e("RxJava_TAG", "onNext" + s);
            //取消/解除订阅实例
            //if ("电视剧第二集".equals(s)) {
            //    //设置在接收到"电视剧第三集"事件后切断观察者和被观察者的连接
            //    mDisposable.dispose();
            //    Log.d("RxJava_TAG", "是否已经切断了连接:" + mDisposable.isDisposed());
            //}
        }

        //事件队列异常事件
        @Override
        public void onError(@NonNull Throwable e) {
            //在事件处理过程中出现异常时,onError()方法会被触发
            Log.e("RxJava_TAG", "onError");
        }

        //事件队列完结事件
        @Override
        public void onComplete() {
            //RxJava把所有事件都当作队列处理
            //当被观察者不再发送onNext()普通事件的时候
            Log.e("RxJava_TAG", "onComplete");
        }
    };



订阅Subscribe

        //订阅Subscribe(关系绑定)
        //当被观察者Observable被订阅时,即事件序列就会依照设定依次被触发
        //即观察者Observer会依次调用对应事件的复写方法从而响应事件
        //从而实现被观察者Observable调用了观察者Observer的回调方法(或者说被观察者向观察者的事件传递)
        TVplay.subscribe(viewer);

番外:这个订阅是被观察者订阅观察者,其实是因为RxJava链式编程的缘故,其实就是被观察者被观察者订阅啦。



运行结果

在这里插入图片描述

可以看见先调用了onSubscribe(),再调用三个onNext(),最后调用了onComplete()

在创建观察者的时候,在onSubscribe()方法可以看见Disposable(如下图蓝标代码),可以把Disposable当作订阅关系

在这里插入图片描述

把图中标红的代码放开得到的运行结果为:

在这里插入图片描述



基于事件流的链式调用(简洁使用)

上面介绍的其实是最基本的写法,只是为了教学才写成那样的,一般在项目中,都采用通过链式调用

就是把创建被观察者、创建观察者、订阅这三个步骤采用链式调用写在一起

        //更简洁的方法 基于事件流的链式调用
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("电影第一部");
                emitter.onNext("电影第二部");
                emitter.onNext("电影第三部");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJava_TAG", "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.e("RxJava_TAG", "onNext" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJava_TAG", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("RxJava_TAG", "onComplete");
            }
        });
    }

运行结果:

在这里插入图片描述



全部代码

public class RxJavaActivity extends AppCompatActivity {

    public static void start(Context context) {
        Intent starter = new Intent(context, RxJavaActivity.class);
        context.startActivity(starter);
    }

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);

        //订阅Subscribe(关系绑定)
        //当被观察者Observable被订阅时,即事件序列就会依照设定依次被触发
        //即观察者Observer会依次调用对应事件的复写方法从而响应事件
        //从而实现被观察者Observable调用了观察者Observer的回调方法(或者说被观察者向观察者的事件传递)
        TVplay.subscribe(viewer);

        //更简洁的方法 基于事件流的链式调用
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("电影第一部");
                emitter.onNext("电影第二部");
                emitter.onNext("电影第三部");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJava_TAG", "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.e("RxJava_TAG", "onNext" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJava_TAG", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("RxJava_TAG", "onComplete");
            }
        });
    }

    //创建被观察者Observable(电视剧)
    //使用create操作符从头开始创建一个Observable
    Observable<String> TVplay = Observable.create(new ObservableOnSubscribe<String>() {//ObservableOnSubscribe<T>其实就像一个队列一样
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {//ObservableEmitter可以理解为事件发射器
            //重写subscribe()定义需要发送的事件
            //ObservableEmitter有三种发射方法,onNext()、onComplete()、onError()
            //onNext()发送事件,可以无限调用,所有的发射信息观察者都可以收得到
            emitter.onNext("电视剧第一集");
            emitter.onNext("电视剧第二集");
            emitter.onNext("电视剧第三集");
            //onComplete()、onError()是互斥的,观察者只会收到一个
            //onComplete()完成事件发送,可以重复调用,但是观察者只会接收一次
            emitter.onComplete();
            //onError()异常发送,不可以重复调用
            //emitter.onError(new IllegalArgumentException("TVplay Exception!"));
        }
    });
    //RxJava还提供了其他方法用于创建被观察者对象Observable
    //just(T...):直接将传入的参数依次发送出来(将一个或多个对象转换成发射这个或这些对象的一个Observable)
    //    Observable<String> observable = Observable.just("电视剧第一集", "电视剧第二集", "电视剧第三集");//这个跟上面的作用是一样的
    //还有很多,比如from等...

    //创建观察者Observer(观影者)
    Observer<String> viewer = new Observer<String>() {
        //定义Disposable类变量
        //private Disposable mDisposable;

        //事件队列开始
        @Override
        public void onSubscribe(@NonNull Disposable d) {//Disposable可以当作订阅关系
            //当订阅时被调用
            Log.e("RxJava_TAG", "onSubscribe");
            //取消/解除订阅
            //d.dispose();
            //是否已经解除订阅
            //d.isDisposed();
            //可以这样操作
            //mDisposable = d;
        }

        //普通事件
        @Override
        public void onNext(@NonNull String s) {
            //发送数据时被多次调用
            Log.e("RxJava_TAG", "onNext" + s);
            //取消/解除订阅实例
            //if ("电视剧第二集".equals(s)) {
            //    //设置在接收到"电视剧第三集"事件后切断观察者和被观察者的连接
            //    mDisposable.dispose();
            //    Log.d("RxJava_TAG", "是否已经切断了连接:" + mDisposable.isDisposed());
            //}
        }

        //事件队列异常事件
        @Override
        public void onError(@NonNull Throwable e) {
            //在事件处理过程中出现异常时,onError()方法会被触发
            Log.e("RxJava_TAG", "onError");
        }

        //事件队列完结事件
        @Override
        public void onComplete() {
            //RxJava把所有事件都当作队列处理
            //当被观察者不再发送onNext()普通事件的时候
            Log.e("RxJava_TAG", "onComplete");
        }
    };
}



版权声明:本文为qq_43680027原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。