重温RxJava(使用Kotlin编写)

  • Post author:
  • Post category:java


前言

使用RxJava挺久的了,但是这后面接触的少,所以有一些淡忘了,于是今天再一次总结了一下,刚好最近都在用Kotlin,所以以下代码是使用Kotlin编写的。

RxJava可以说是这样的:

  • 异步:可以很方便地切换线程
  • 简洁:在复杂的逻辑中保持代码简洁

基本使用

1.创建一个Observer

  • onNext:观察数据
  • onError:事件队列失败调用的方法
  • onCompleted:事件队列完成的方法

    onError和onCompleted只能有一个被调用


Subscriber

:是Observer的一个子类,使用它观察的时候,最开始会调用onStart方法,其他方法不变

2.创建Observable:

val observable = Observable.create(Observable.OnSubscribe<String> {
    //这里的it为observer
    it.onNext("1")
    it.onNext("2")
    it.onNext("3")
    it.onCompleted()
})

3.开始监听

observable.subscribe(observer)

当调用subscribe方法后,如果观察者是Observer,那么会将它封装到一个Subscriber中。然后最后会调用OnSubscribe的call方法。

4.just方法

//相当于多次调用Subscriber对象的onNext()方法
val observable = Observable.just("1", "2", "...")

5.from方法

val A= arrayOf("1","2","3")
val observable = Observable.from(A)

效果跟just相同

6.Action

val nextAction = Action1<String> {
    //call方法
    Log.d(TAG, "next:$it")
}

val errorAction = Action1<Throwable> {
    //call方法
    Log.d(TAG, "error:${it.message}")
}

val completeAction = Action0 {
    //call方法
    Log.d(TAG, "complete:")
}

observable.subscribe(nextAction)//当onNext方法
observable.subscribe(nextAction, errorAction)//当onNext、onError方法
observable.subscribe(nextAction, errorAction, completeAction)//当onNext、onError、onComplete方法

变换

1.map方法(适合一对一的转换)

//将图片的路径转换为bitmap
Observable.just("imagUrl.jpg")
        .map(object : Func1<String, Bitmap> {
            override fun call(t: String?): Bitmap {
                return getBitmap(t!!)
            }
        }).subscribe(object : Action1<Bitmap> {
            override fun call(t: Bitmap?) {
                //显示bitmap
            }
        })

2.flatmap(适合一对多的转换)

val subModels1 = arrayOf("11", "12")
val subModels2 = arrayOf("21", "22")
val models = arrayOf(Model(subModels1), Model(subModels2))

Observable.from(models)
        .flatMap(object : Func1<Model, Observable<String>> {
            override fun call(t: Model?): Observable<String> {
                //逐个数据发射
                return Observable.from(t?.subModels)
            }
        }).subscribe {
    Log.d(TAG, "next:$it")
}

创建Observable,但并不发送Observable对象,而是激活然后将激活的Observable的数据汇集到同一个Observable进行发送。


大致的原理:

创建一个新的Observable(代理),用于接收原Observable,然后发送给新的Subscriber(代理),新的Subscriber会将结果发送给目标Subscriber(我们创建的)。

线程控制

  • Schedulers.immediate(): 默认,直接在当前线程中执行。
  • Schedulers.newThread(): 启动新线程
  • Schedulers.io(): 启动IO线程
  • Schedulers.computation(): 密集计算使用的线程
  • AndroidSchedulers.mainThread(): 在Android的主线程中使用
Observable.just("url1", "url2", "url3")
        .map {
            getBitmap(it)
        }.subscribeOn(Schedulers.io())//io线程转换
        .observeOn(AndroidSchedulers.mainThread())//在主线程中显示bitmap
        .subscribe {
            //显示bitmap
        }
  • subscribeOn():只能指定一次,它的切换发生在OnSubscribe中,后面指定无效。
  • observeOn():可以指定多次,指定后在它后面的操作都是在该指定的线程中执行的。控制在它后面的线程。

与Retrofit结合使用

1.封装Observer

abstract class BaseObserver<T> : Subscriber<T>() {
    override fun onError(e: Throwable?) {
        onFail(e)
        onEnd()
    }

    override fun onNext(t: T) {
        onSuccess(t)
    }

    override fun onCompleted() {
        onEnd()
    }

    /**
     * 结束回调
     */
    abstract fun onEnd()

    /**
     * 请求成功回调
     */
    abstract fun onSuccess(data: T)

    /**
     * 请求失败回调
     */
    abstract fun onFail(error: Throwable?)
}

2.定义接口

interface Api {
    /**
     * 假设根据id获取用户信息
     */
    @GET("path")
    fun getUser(@Query("id") id: Int): Observable<User>
}

3.创建接口实例,并发送请求

//创建retrofit对象时,需要添加:addCallAdapterFactory(RxJava2CallAdapterFactory.create()),可让api方
//法的返回类型转换为Observable<T>
fun getUser(id: Int) {
    api.getUser(id).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : BaseObserver<User>() {
                override fun onStart() {
                    super.onStart()
                    //toDo:显示加载中
                }

                override fun onEnd() {
                    //toDo:做相关的操作,比如让加载提示消失
                }

                override fun onSuccess(data: User) {
                    //toDo:更新UI
                }

                override fun onFail(error: Throwable?) {
                    //toDo:提示获取失败
                }
            })
}

以上便是自己对RxJava的二次学习,没有很深,还有一些没有写出来,但在平常中以上的知识点可以用到挺多场景中。



版权声明:本文为A_sendy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。