RxJava中遇到的坑
1.线程无限创建
在轮询的请求中使用.subscribeOn(Schedulers.io())可能会导致无限创建线程问题。
Schedulers是RxJava的主要组件之一。负责在不同线程上执行Observable的操作,便于将耗时任务分摊到其他线程。
我们这里需要连了解下Schedulers下的不同的Scheduler的属性和使用:
IOScheduler
最常见的调度器之一。用于IO相关操作。比如网络请求和文件操作。IO 调度器背后由线程池支撑。
它首先创建一个工作线程,可以复用于其他操作。当然,当这个工作线程(长时间任务的情况)不能被复用时,会创建一个新的线程来处理其他操作。这个好处也会带来一些问题,因为它允许创建的线程是无限的,所以当创建大量线程的时候,会对整体性能造成一些影响。
subscribeOn(Schedulers.io())
ComputationScheduler
作为计算密集型的 Scheduler,Schedulers.computation( )用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());ComputationScheduler的线程数是与 CPU 核心密切相关的,原因是当线程数远远超过 CPU 核心数目时,CPU 的时间更多的损耗在了线程的上下文切换,默认线程数等于处理器的数量
subscribeOn(Schedulers.computation())
Single
由一个线程支持。所以无论有多少个Observable,都将只运行在这个线程上。
subscribeOn(Schedulers.single())
Immediate
此调度器在当前活跃线程以阻塞的方式开始其任务(rxjava2已将其移除),无视当前运行的任务。使用RxJava2的可以忽略。
subscribeOn(Schedulers.immediate())
Trampoline
此调度器运行在当前线程,所以如果你有代码运行在主线程上,它会将将要运行的代码块添加到主线程的队列。和Immediate非常相似,不同的是Immediate会阻塞此线程,而Trampoline会等待当前任务执行完成。适用用于不止一个Observable,并且希望它们能够按照顺序执行的场景。
subscribeOn(Schedulers.trampoline())
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext);
Observable.just( 5,6, 7,8, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext);
Output:
Number = 1
Number = 2
Number = 3
Number = 4
Number = 5
Number = 6
Number = 7
Number = 8
Number = 9
Executor Scheduler()
更像是一种自定义的IO调度器。我们可以通过制定线程池的大小来创建一个自定义的线程池。适用于Observable的数量对于IO调度器太多的场景使用,使用如下:
Scheduler ioOne = Schedulers.from(Executors.newFixedThreadPool(5));
Scheduler fixScheduler = Schedulers.from(ioOne)
subscribeOn(fixScheduler)
Android Scheduler
由rxAndroid库提供,用于将操作切换到主线程以便操作ui,经常用于observeOn方法。使用如下:
AndroidSchedulers.mainThread()
observeOn(AndroidSchedulers.mainThread())
之所以在轮询请求情况下回出现无限创建线程和IOScheduler的实现密切相关,当任务一直在执行或者来不及释放之前的线程就会创建新的线程。所以很极限的情况就是轮训时间过短到之前一个的线程还没有完成任务释放出来,就会先创建一个线程出来,这个时候我们应该使用Executor Scheduler调度器自定义线程池的个数。
Scheduler ioOne = Schedulers.from(Executors.newFixedThreadPool(5));
Scheduler fixScheduler = Schedulers.from(ioOne)
2.Interval和操作符一起使用操作符中操作发生异常会中断整个Interval
Observable.interval(tripInterval, TimeUnit.SECONDS)
.flatMap((Function<Long, ObservableSource<Response<TripResult>>>) aLong ->
AppDelegateImpl.getInstance()
.getInfoService()
.getTripInfo())
.subscribeOn(SchedulerPoolHelper.getInstance().getIoTrip())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new NetConsumer<TripResult>() {
@Override
public void onSuccess(TripResult result) {
processTripState(result);
}
}, throwable -> {
LogUtils.d("Trip: " + throwable.getMessage());
});
比如说:我每隔tripInterval秒回去请求一次接口,我们把请求接口放到中间的flatMap操作符中,一旦因为请求超时或者是一次脏数据导致请求失败,则整个interval都会中断。
这个问题导致我们的请求可能由于网络较差失败过一次后,就不在有请求了。
所以这种情况我是把请求嵌套在了Consumer的onSuccess中了,这样就不会中断了。
Observable.interval(tripInterval, TimeUnit.SECONDS)
.subscribeOn(SchedulerPoolHelper.getInstance().getIoTrip())
.subscribe(aLong -> {
AppDelegateImpl.getInstance().getInfoService()
.getTripInfo()
.subscribeOn(SchedulerPoolHelper.getInstance().getIoTrip())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new NetConsumer<TripResult>() {
@Override
public void onSuccess(TripResult result) {
processTripState(result);
}
}, throwable -> LogUtils.d(TAG, "Trip: " + throwable.getMessage()));
});
查阅了一些资料没有发现好的解决方案,如果大佬们有不同的简洁可以在留言区留言提出不同的方案。