Observable 即为被观察者, Schedulers 是用于生成 Scheduler (订阅)实例对象的。
它们配合使用,还可以达到类似Android 的Handler 效果 (
延迟发送
以及
指定
观察者 或 被观察者的
线
程)
例如:
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
public class ObserverOnTest {
private static final long DELAY_TIME = 5;
public static void main(String[] args) {
System.out.println("==========main time=" + DateTimeTest.getCurrentTime());
Observable<Long> testObservable = Observable.timer(DELAY_TIME, TimeUnit.SECONDS).observeOn(Schedulers.newThread());
//testObservable.subscribe(o -> startDoSomething());
System.out.println("==========main current thread=" + Thread.currentThread().getName());
testObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
startDoSomething();
}
});
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void startDoSomething() {
System.out.println("==========startDoSomething time=" + DateTimeTest.getCurrentTime());
System.out.println("==========startDoSomething current thread=" + Thread.currentThread().getName());
}
}
其中, DateTimeTest.getCurrentTime() 为自己实现的一个简单获取时间的方法,用于查看延迟时间 ,可忽略。
输出:
> Task :javatest:ObserverOnTest.main()
==========main time=01-04 14:51:35
==========main current thread=main
==========startDoSomething time=01-04 14:51:40
==========startDoSomething current thread=RxNewThreadScheduler-1
可以看到, 通过
.observeOn(Schedulers.newThread())
指定了观察者的线程,即创建一个新的子线程。
testObservable.subscribe() 里面回调的方法则是在上面创建的子线程中执行。
版权声明:本文为whjk20原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。