RxJava_3_Observable & Schedulers

  • Post author:
  • Post category:java


Observable 即为被观察者, Schedulers 是用于生成 Scheduler (订阅)实例对象的。

它们配合使用,还可以达到类似Android 的Handler 效果 (

延迟发送

以及

指定

观察者 或 被观察者的

线

程)

例如:

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

public class ObserverOnTest {
    private static final long DELAY_TIME = 5;

    public static void main(String[] args) {
        System.out.println("==========main time=" + DateTimeTest.getCurrentTime());
        Observable<Long> testObservable = Observable.timer(DELAY_TIME, TimeUnit.SECONDS).observeOn(Schedulers.newThread());
        //testObservable.subscribe(o -> startDoSomething());
        System.out.println("==========main current thread=" + Thread.currentThread().getName());
        testObservable.subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                startDoSomething();
            }
        });

        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void startDoSomething() {
        System.out.println("==========startDoSomething time=" + DateTimeTest.getCurrentTime());
        System.out.println("==========startDoSomething current thread=" + Thread.currentThread().getName());
    }
}

其中, DateTimeTest.getCurrentTime() 为自己实现的一个简单获取时间的方法,用于查看延迟时间 ,可忽略。

输出:

> Task :javatest:ObserverOnTest.main()
==========main time=01-04 14:51:35
==========main current thread=main
==========startDoSomething time=01-04 14:51:40
==========startDoSomething current thread=RxNewThreadScheduler-1

可以看到, 通过

.observeOn(Schedulers.newThread())

指定了观察者的线程,即创建一个新的子线程。

testObservable.subscribe()  里面回调的方法则是在上面创建的子线程中执行。



版权声明:本文为whjk20原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。