五、背压策略
背压:下游消费速度跟不上上游发送数据速度,导致阻塞或异常,进而影响上游。
reactor提供了集中背压策略
- onBackPressureBuffer – 顾名思义,来不及消费的数据先缓存在队列里
- onBackPressureDrop – drop,丢掉,来不及消费的数据直接扔掉
- onBackPressureLatest – 保留最新数据,一旦下游请求出现,立即推向下游
- onBackPressureError – 下游消费速度跟不上,直接抛异常报错
- limitRate(n) – 限速,上游一次最多发n个
onBackPressureBuffer- Reactor默认使用的背压策略
public final Flux<T> onBackpressureBuffer(int maxSize)
弹珠图:
demo:
- 创建一个流1,放从1递增的100个数据
- 每隔10毫秒从流1订阅一个元素,形成新的响应式流2
- 添加背压策略onBackpressureBuffer,设置一个50个元素大小的缓存区
- 每隔100毫秒从流2订阅一个元素,形成新的响应式流3
-
订阅响应式流3元素,
- 数据信号onNext – 打印
- 异常信号onError – 打印
- 完成信号onComplete – 打印
- 线程阻塞5秒
Flux.range(1,100)
.delayElements(Duration.ofMillis(10))
.onBackpressureBuffer(50)
.delayElements(Duration.ofMillis(100))
.subscribe(
item -> System.out.print(item +" "),
ex -> System.out.println("onError: "+ex),
() -> System.out.println("onComplete")
);
Thread.sleep(5000);
打印结果:
订阅了8个元素就异常了,
因为缓存区设置了50个大小,不够用
画个图示意一下,更好理解:
Stream2隔10毫秒发射一个数据,Stream3隔100毫秒订阅一个数据,
Stream2欻欻欻扔了10个数据,Stream3才拿走一个,没订阅的就扔缓存队列里,
缓存50个数据满了时,Stream3才订阅了5个,
后面的数据缓存也没地了,消费速度也跟不上,直接抛异常了就
我们可以将缓存空间设置大一点就ok了
onBackPressureDrop- 来不及消费的数据我就扔掉
弹珠图:
发布者欻欻欻就是扔数据,订阅者来不及订阅的元素直接drop扔掉
demo:
步骤同上,只是背压策略换成onBackPressureDrop
Flux.range(1,100)
.delayElements(Duration.ofMillis(10))
.onBackpressureDrop()
.delayElements(Duration.ofMillis(100))
.subscribe(
item -> System.out.print(item+" "),
ex -> System.out.println("onError: "+ex),
() -> System.out.println("onComplete")
);
Thread.sleep(5000);
打印结果:
可以看到,没有报错,打印了onComplete,说明是正常订阅完成
但是数据只订阅了32个,后面来不及订阅的68个全部扔掉了,嘎嘎
onBackPressureLast – 也是丢弃来不及订阅的数据,但是保留最新的
弹珠图
demo:
步骤同上,只是背压策略换成onBackpressureLatest
Flux.range(1,100)
.delayElements(Duration.ofMillis(10))
.onBackpressureLatest()
.delayElements(Duration.ofMillis(100))
.subscribe(
item -> System.out.print(item+" "),
ex -> System.out.println("onError: "+ex),
() -> System.out.println("onComplete")
);
Thread.sleep(5000);
打印结果:
可以看到:
1、正常订阅完成
2、订阅到元素32速度就跟不上了,后面数据丢弃
3、但是保留了最新数据100,被订阅到
onBackpressureError – 下游速度跟不上、发错误信号
弹珠图:
demo:
步骤同上,只是背压策略换成onBackpressureError
Flux.range(1,100)
.delayElements(Duration.ofMillis(10))
.onBackpressureError()
.delayElements(Duration.ofMillis(100))
.subscribe(
item -> System.out.print(item+" "),
ex -> System.out.println("onError: "+ex),
() -> System.out.println("onComplete")
);
Thread.sleep(5000);
打印结果:
可以看到,订阅了4个元素就报错了
错误信息: 流溢出异常:接收者被比预期更多的信号给累蒙圈了,哈哈
onError: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)