【基础】Reactor 响应式编程

  • Post author:
  • Post category:其他




Reactor 基本概述

Reactor 是 Java 反应式编程的框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应。反应式编程通过异步的方式提高了系统的吞吐量,有效利用了系统资源,但其并不能提高响应的速度。其在根本上是一种 pub/sub 模式,通过在发布者与消费者之间的预留通道实现异步响应,类似于 Future 的概念。

发布者和订阅者是 Reactor 中有两个最基本的概念,可以简单理解为消息队列中的生产者和消费者的概念。在Reactor中发布者有两个,一个是 Flux,一个是 Mono。Flux 代表的是 0-N 个元素的响应式序列,而 Mono 代表的是 0-1个的元素的结果。

因为流式处理一般将处理多个元素,因此本文主要学习 Flux 相关的内容来理解 Reactor 框架的原理和使用。



Flow 核心概念

JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。

在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber:

  • Publisher 发布数据;

  • Subscriber 接收 Publisher 发布的数据并进行消费;

  • 在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理;

  • Publisher 与 Subscriber 之间的订阅关系由 Subscription 控制;



Publisher 数据发布者

Publisher 是数据的发布者,它是一个函数式接口,其中只有一个方法,该方法可以配置其对应的消费者,源码如下:

    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }



Subscriber 数据消费者

Subscriber 负责消费数据,其内部定义了 4 个方法,源码如下:

public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }


onSubscribe()

:当与消费者绑定成功时调用该方法;


onNext()

:当接收到一条发布数据时调用该方法;


onError()

:当发布者或消费者发生异常时调用该方法;


onComplete()

:当发布者关闭且所有数据已经被全部消费后调用该方法;



Subscription 订阅关系

Subscription 定义了发布者和消费者的订阅关系,可以理解为两者之间的通道,定义源码如下:

    public static interface Subscription {
        public void request(long n);

        public void cancel();
    }


request()

:该方法用于向通道中请求 n 个数据进行处理;


cancel()

:该方法用于取消发布者和消费者的绑定关系;



Processor 中间处理器

Processor 是数据发布者与消费者中间的处理器,可以对发布者发布的数据进行预处理后再发送给消费者进行消费。

实际上其既是发布者,又是消费者,即在两者中间进行了一次数据的处理转发,其源码如下:

    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }



消息发布订阅示例代码

上述几个核心概念的基本使用代码如下,通过代码可以更好的理解 Flow 的原理

public class FlowDemo {

    public static void main(String[] args) {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                // 向数据发布者请求数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("接收到消息>>>" + item);
                // 接收数据后可以继续接收或取消订阅
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                // 产生异常后直接取消订阅
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 发布者所有数据全部被接收,且发布者已经关闭
                System.out.println("数据接收完毕~");
            }
        };
        // 将订阅者注册到发布者
        publisher.subscribe(subscriber);
        // 发布消息
        for (int i = 0; i < 10; i++) {
            // 发送数据
            publisher.submit(String.valueOf(i));
        }
        // 关闭发布者
        publisher.close();
        // 维持程序保持开启
        while (true) {}
    }

Processor 处理器的示例代码:

public class ProcessorDemo {

    public static void main(String[] args) {

        class DataFilter extends SubmissionPublisher<String> implements Flow.Processor<String,String>{

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                this.submit("【处理后数据】" + item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                this.close();
            }
        }

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        DataFilter dataFilter = new DataFilter();
        publisher.subscribe(dataFilter);

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                //向数据发布者请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("接收到消息>>>" + item);
                //接收完成后,可以继续接收或者不接收
                //this.subscription.cancel();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                //出现异常,就会来到这个方法,此时直接取消订阅即可
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                //发布者的所有数据都被接收,并且发布者已经关闭
                System.out.println("数据接收完毕~");
            }
        };

        dataFilter.subscribe(subscriber);

        for (int i = 0; i < 1000; i++) {
            // 发送数据
            System.out.println("产生数据" + i);
            publisher.submit(String.valueOf(i));
        }
        //关闭发布者
        publisher.close();
        // 维持程序保持开启
        while (true) {}
    }

}



Backpressure 回压

Backpressure 回压是指消费能力低于生产能力时,Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,源码如下:

    static final int DEFAULT_BUFFER_SIZE = 256;

    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

当 Subscription 存满时,生产者将根据消费者的消费能力动态的调整数据发布的速度,以实现消费者对生产者的反向控制。



Backpressure 示例代码

运行代码,可以观察到,生产者在生产数据达到 Subscription 上限时便停止生产,然后根据消费者的消费速度动态调节生产速度。

public class FlowDemo0 {

    public static void main(String[] args) {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                // 向数据发布者请求数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("接收到消息>>>" + item);
                // 等待1秒再接收下一条数据
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 接收数据后可以继续接收或取消订阅
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                // 产生异常后直接取消订阅
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 发布者所有数据全部被接收,且发布者已经关闭
                System.out.println("数据接收完毕~");
            }
        };
        // 将订阅者注册到发布者
        publisher.subscribe(subscriber);
        // 发布消息
        for (int i = 0; i < 500; i++) {
            // 发送数据
            System.out.println("产生数据" + i);
            publisher.submit(String.valueOf(i));
        }
        // 关闭发布者
        publisher.close();
        // 维持程序保持开启
        while (true) {}
    }
}



Reactor – Mono/Flux API

注:下述代码在测试时,需要添加

while (true) {}

以维持主线程开启,避免数据还没有处理完主线程就关闭的情况。



创建 Mono/Flux


  • just()

    :使用已知内容创建;


  • fromIterable()

    :通过可迭代对象创建;


  • fromStream()

    :从集合流中创建;


  • range()

    :通过范围迭代创建;

        // 1. 创建 Flux/Mono
        // 1.1 使用已知内容创建 Flux
        Flux.just(1, 2, 3, 4, "hello", "world")
                .subscribe(System.out::println);
        // 1.2 通过可迭代对象创建 Flux
        Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
                .subscribe(System.out::println);
        // 1.3 从集合流中创建 Flux
        Flux.fromStream(Stream.of(1,2,3,4))
                .subscribe(System.out::println);
        // 1.4 通过范围迭代创建 Flux
        Flux.range(0,10)
                .subscribe(System.out::println);

  • interval()

    :按照从 0 递增的方式自动创建;


  • delayElements()

    :数据流延时发送方法;

        // 2. 创建时常用的方法
        // 2.1 interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列
        Flux.interval(Duration.ofMillis(100))
                // 限制执行10次
                .take(10)
                .subscribe(System.out::println);
        // 2.2 delayElements() 方法延时发送
        Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
                .delayElements(Duration.ofMillis(1000L))
                .subscribe(System.out::println);



Exception 异常处理


  • doOnError()

    :异常监听,监听到异常的处理逻辑;


  • onErrorReturn()

    :产生异常时返回消息给订阅者;

        Flux.just("1", "2", "3")
                // concatWith() 可以连接一个新的流
                .concatWith(Flux.error(new Exception("手动模拟异常...")))
                .doOnError(Throwable::printStackTrace)
                .onErrorReturn("产生异常,返回 500...")
                .subscribe(System.out::println);

  • subscribe()

    :可以通过传入参数指定异常处理

    • 参数1:定义正常消费逻辑;

    • 参数2:定义异常处理逻辑;

    • 参数3:定义消费完成的逻辑;

        Flux.just("1", "2", "3")
                // concatWith() 可以连接一个新的流
                .concatWith(Flux.error(new Exception("手动模拟异常...")))
                .subscribe(System.out::println,
                        System.err::println,
                        () -> System.out.println("完成..."));

  • onErrorResume()

    :产生异常后重新产生新的流
        Flux.just("1", "2", "3")
                // concatWith() 可以连接一个新的流
                .concatWith(Flux.error(new Exception("手动模拟异常...")))
                .onErrorResume(throwable -> {
                    System.out.println(throwable.getMessage());;
                    return Flux.just("1", "1", "1");
                })
                .subscribe(System.out::println);

  • retry()

    :产生异常后进行重试,参数为重试次数
        Flux.just("1", "2", "3")
                // concatWith() 可以连接一个新的流
                .concatWith(Flux.error(new Exception("手动模拟异常...")))
                .retry(1)
                .subscribe(System.out::println);



常用的数据处理 API


  • merge()

    :按照所有流的实际产生顺序进行合并;


  • mergeSequential()

    :按照流合并的次序进行合并,先消费第一个,在消费第二个;

        Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),
                   Flux.interval(Duration.ofMillis(10)).take(3))
                .subscribe(System.out::println);

        Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),
                             Flux.interval(Duration.ofMillis(10)).take(3))
                .subscribe(System.out::println);

  • buffer()

    :将流中的元素收集为集合;

    • 可以传入两个参数,分别为 maxSize 和 skip。其中,maxSize 代表数据切割后每个集合的最大长度;skip 代表每一次切割后切割器切割起点跳跃的元素个数

  • bufferTimeout()

    :按照时间间隔切割对流中的数据进行收集;

    • 可以传入两个参数,分别为 maxSize 和 maxTime。其中,maxSize 代表数据切割后每个集合的最大长度;maxTime 代表切割的最大时间间隔

  • bufferWhile()

    :当 Predicate 为 true 时才收集当前元素;


  • bufferUntil()

    :直到 Predicate 为 true 时才收集一次所有元素;

    • 当第二个参数为 true 时,满足条件的元素将作为集合的首个元素;若为 false,则满足条件的元素为最后一个元素(默认)
        Flux.range(1, 10)
                .buffer(3, 3)
                .subscribe(System.out::println);

        Flux.interval(Duration.ofMillis(100L))
                .bufferTimeout(9, Duration.ofMillis(1000L))
                .subscribe(System.out::println);

        Flux.range(1, 10)
                .bufferWhile(i -> i % 2 == 0)
                .subscribe(System.out::println);

        Flux.range(1, 10)
                .bufferUntil(i -> i % 2 == 0, false)
                .subscribe(System.out::println);

  • filter()

    :按条件对流中的数据进行过滤;
        Flux.range(1, 10).
                filter(i -> i % 2 == 0).
                subscribe(System.out::println);

  • zipWith()

    :把流中的元素与另一个流中对应元素进行合并,多余元素将被抛弃;

    • 第二个参数可以指定合并的规则
        Flux.just(1, 2)
                .zipWith(Flux.just(3, 4), (s1, s2) -> s1 + "," + s2)
                .subscribe(System.out::println);

  • take()

    :提取指定数量的元素或按时间间隔提取元素;


  • takeLast()

    :提取最后 n 个元素;


  • takeWhile()

    :当 Predicate 返回 true 时才进行提取;


  • takeUntil()

    :提取元素直到 Predicate 返回 true;

        Flux.range(1, 1000)
                .take(10)
                .subscribe(System.out::println);

        Flux.interval(Duration.ofMillis(10))
                .take(Duration.ofMillis(100))
                .subscribe(System.out::println);

        Flux.range(1, 1000)
                .takeLast(10)
                .subscribe(System.out::println);

        Flux.range(1, 1000)
                .takeWhile(i -> i < 20)
                .subscribe(System.out::println);

        Flux.range(1, 1000)
                .takeUntil(i -> i > 100)
                .subscribe(System.out::println);

  • reduce()

    :对流中数据进行规约聚合;


  • reduceWith()

    :对流中数据进行规约聚合,第一个参数可以通过 Supplier 设置初始值;

        Flux.range(1, 100)
                .reduce(Integer::sum)
                .subscribe(System.out::println);

        Flux.range(1, 100)
                .reduceWith(() -> 100, Integer::sum)
                .subscribe(System.out::println);

  • map()

    :将流中的元素依次进行映射处理;


  • flatMap()

    :将流中的每一个元素看作一个新的流进行处理并按实际生产顺序进行合并;


  • flatMapSequential()

    :按订阅顺序进行合并;

        Flux.range(1, 100)
                .map(x -> {
                    return x / 2;
                })
                .subscribe(System.out::println);

        Flux.just(5, 10)
                .flatMap(x ->
                        Flux.interval(Duration.ofMillis(x * 10))
                        .take(x)
                )
                .subscribe(System.out::println);


        Flux.just(5, 10)
                .flatMapSequential(x ->
                        Flux.interval(Duration.ofMillis(x * 10))
                        .take(x)
                )
                .subscribe(System.out::println);

  • skip()

    :跳过指定条数或跳过指定时间间隔;
        Flux.just(1, 2, 3, 4, 5, 6, 7)
                .skip(2)
                .subscribe(System.out::println);

        Flux.interval(Duration.ofMillis(100))
                .skip(Duration.ofMillis(300))
                .subscribe(System.out::println);

  • distinct()

    :去重处理;
        Flux.just(1, 1, 2, 2, 5, 6, 7)
                .distinct()
                .subscribe(System.out::println);

  • last()

    :取流中的最后一个元素;


  • next()

    :取流中的第一个元素;

        Flux.just(1, 2, 3, 4, 5)
                .last()
                .subscribe(System.out::println);

        Flux.just(1, 2, 3, 4, 5)
                .next()
                .subscribe(System.out::println););

  • doOnError()

    :产生异常时执行;


  • doOnComplete()

    :数据接收完成时执行;


  • doFinally()

    :最后执行;

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .concatWith(Flux.error(new Exception()))
                //错误时执行
                .doOnError(e -> System.out.println("产生异常>>>" + e))
                //完成时执行
                .doOnComplete(() -> System.out.println("数据接收完成~"))
                //最后执行
                .doFinally(t -> System.out.println("最后执行信息>>>" + t))
                .subscribe(System.out::println);



参考文章


  1. Java反应式框架Reactor中的Mono和Flux

  2. 探究WebFlux之Reactor

  3. WebFlux 前置知识(四)

  4. JAVA Reactor API ( Flux和Mono)的简单使用



版权声明:本文为zqf787351070原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。