Panda白话 Reactor -背压策略

  • Post author:
  • Post category:其他



上回书我们讲了元素采样和延迟响应操作符 – 传送门




五、背压策略

背压:下游消费速度跟不上上游发送数据速度,导致阻塞或异常,进而影响上游。

reactor提供了集中背压策略

  • onBackPressureBuffer – 顾名思义,来不及消费的数据先缓存在队列里
  • onBackPressureDrop – drop,丢掉,来不及消费的数据直接扔掉
  • onBackPressureLatest – 保留最新数据,一旦下游请求出现,立即推向下游
  • onBackPressureError – 下游消费速度跟不上,直接抛异常报错
  • limitRate(n) – 限速,上游一次最多发n个




onBackPressureBuffer- Reactor默认使用的背压策略

public final Flux<T> onBackpressureBuffer(int maxSize)


弹珠图:


在这里插入图片描述


demo:

  1. 创建一个流1,放从1递增的100个数据
  2. 每隔10毫秒从流1订阅一个元素,形成新的响应式流2
  3. 添加背压策略onBackpressureBuffer,设置一个50个元素大小的缓存区
  4. 每隔100毫秒从流2订阅一个元素,形成新的响应式流3
  5. 订阅响应式流3元素,

    • 数据信号onNext – 打印
    • 异常信号onError – 打印
    • 完成信号onComplete – 打印
  6. 线程阻塞5秒
Flux.range(1,100)
                .delayElements(Duration.ofMillis(10))
                .onBackpressureBuffer(50)
                .delayElements(Duration.ofMillis(100))
                .subscribe(
                        item -> System.out.print(item +" "),
                        ex -> System.out.println("onError: "+ex),
                        () -> System.out.println("onComplete")
                );
                Thread.sleep(5000);


打印结果:


订阅了8个元素就异常了,

因为缓存区设置了50个大小,不够用

在这里插入图片描述

画个图示意一下,更好理解:

在这里插入图片描述

Stream2隔10毫秒发射一个数据,Stream3隔100毫秒订阅一个数据,

Stream2欻欻欻扔了10个数据,Stream3才拿走一个,没订阅的就扔缓存队列里,

缓存50个数据满了时,Stream3才订阅了5个,

后面的数据缓存也没地了,消费速度也跟不上,直接抛异常了就


我们可以将缓存空间设置大一点就ok了




onBackPressureDrop- 来不及消费的数据我就扔掉


弹珠图:


发布者欻欻欻就是扔数据,订阅者来不及订阅的元素直接drop扔掉

在这里插入图片描述


demo:


步骤同上,只是背压策略换成onBackPressureDrop

 Flux.range(1,100)
                .delayElements(Duration.ofMillis(10))
                .onBackpressureDrop()
                .delayElements(Duration.ofMillis(100))
                .subscribe(
                        item -> System.out.print(item+" "),
                        ex -> System.out.println("onError: "+ex),
                        () -> System.out.println("onComplete")
                );
                Thread.sleep(5000);


打印结果:


可以看到,没有报错,打印了onComplete,说明是正常订阅完成

但是数据只订阅了32个,后面来不及订阅的68个全部扔掉了,嘎嘎

在这里插入图片描述



onBackPressureLast – 也是丢弃来不及订阅的数据,但是保留最新的


弹珠图


在这里插入图片描述


demo:


步骤同上,只是背压策略换成onBackpressureLatest

 Flux.range(1,100)
                .delayElements(Duration.ofMillis(10))
                .onBackpressureLatest()
                .delayElements(Duration.ofMillis(100))
                .subscribe(
                        item -> System.out.print(item+" "),
                        ex -> System.out.println("onError: "+ex),
                        () -> System.out.println("onComplete")
                );
                Thread.sleep(5000);


打印结果:


可以看到:

1、正常订阅完成

2、订阅到元素32速度就跟不上了,后面数据丢弃

3、但是保留了最新数据100,被订阅到

在这里插入图片描述



onBackpressureError – 下游速度跟不上、发错误信号


弹珠图:


在这里插入图片描述


demo:


步骤同上,只是背压策略换成onBackpressureError

 Flux.range(1,100)
                .delayElements(Duration.ofMillis(10))
                .onBackpressureError()
                .delayElements(Duration.ofMillis(100))
                .subscribe(
                        item -> System.out.print(item+" "),
                        ex -> System.out.println("onError: "+ex),
                        () -> System.out.println("onComplete")
                );
                Thread.sleep(5000);


打印结果:


可以看到,订阅了4个元素就报错了

在这里插入图片描述

错误信息: 流溢出异常:接收者被比预期更多的信号给累蒙圈了,哈哈

onError: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)



版权声明:本文为FightingITPanda原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。