反应式编程的思想最近得到了广泛的流行。 在Java平台上有流行的反应式库RxJava和Reactor。反应式流规范的出发点是提供一个带非阻塞负压( non-blocking backpressure ) 的异步流处理规范。反应式流规范的核心接口已经添加到了Java9中的java.util.concurrent.Flow类中。Flow中包含了Flow.PublisherFlow.SubscriberFlow.SubscriptionFlow.Processor等 4 个核心接口。Java 9还提供了SubmissionPublisher作为Flow.Publisher的一个实现。RxJava 2Reactor都可以很方便的与Flow类的核心接口进行互操作。 Reactive Streams API ( java.util.concurrent.Flow)实现了异步非阻塞的流处理方式。

Reactive Streams是一项非阻塞背压的异步流处理方式,因此他们有一组PublisherSubscriberPublisher将数据流pushSubscriberSubscriber则将消费这些数据流,并通过backpressure(个人理解为回压)来反馈Subscriber消费时的压力,调节Publisher生产的速度。

# Java 9 Flow API

Java 9 Flow API实现了Reactive Streams规范。Flow APIIteratorObserver模式的组合。Iteratorpull模型上工作,其中应用程序从源中拉出数据,而Observerpush模型上工作,并在将数据从源推到应用程序时进行处理。

Java 9 Flow API订阅者可以在订阅发布者的同时请求N个项目。 然后将项目从Publisher推送到Subsriber,直到没有其他项目可推送或出现一些错误为止。

其中所有的方法都是void,因为所有的方法都是异步执行的。

# java.util.concurrent.Flow.Publisher

Publisher函数式接口用于将数据流发送到Subscriberpublisher有两个方法用于发送数据,一个是submit,一个是offer。两个方法下面实际都是调用的doOffer方法,所以,offer方法提供了置顶延迟时间后丢弃的策略,而submitoffer的简单实现,是一致阻塞不丢弃。

// 绑定订阅者
public void subscribe(Subscriber<? super T> subscriber);
1
2

# java.util.concurrent.SubmissionPublisher

该接口在实现了publisher<T>之外还实现了AutoCloseable接口,因此具有异步提交数据流到当前的订阅者中直到连接被关闭功能。所以可以直接用try块来进行资源的管理。发布者,通过使用Exceutor框架提交reactive stream数据到订阅者

代码演示:使用SubmissionPublisher作为发布者示例,看一下响应流实现的测试程序。

// Employee 用户自己定义的类
public static void main(String[] args) throws InterruptedException {
    //create publisher
    SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

    //register subscriber
    MySubscriber subscriber = new MySubscriber();
    publisher.subscribe(subscriber);
    List<Employee> employees = xxxxx;

    //publish items
    employees.forEach(publisher::submit);
    while (employees.size() != subscriber.getCount()) {
        TimeUnit.SECONDS.sleep(5);
    }
    publisher.close();
    System.out.println("exiting app");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# java.util.concurrent.Flow.Subscriber

Subscriber则为消费处理Publisher发送过来的数据流

public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
1
2
3
4

【1】onSubscribe: 这是在Subscriber服务器订阅Publisher以接收消息时调用的第一种方法。 通常,我们调用subscription.request开始从处理器接收具体数量的数据流。
【2】onNext:Publisher处收到数据时,将调用此方法,这是我们在其中实现业务逻辑以处理data stream,然后从Publisher处请求更多数据的地方。
【3】onError: 当发生不可恢复的错误时,将调用此方法,我们可以使用此方法清理任务,例如关闭数据库连接。
【4】onComplete: 这类似于finally方法,并且在Publisher没有push任何其他data stream并且关闭Publisher时被调用。 我们可以使用它发送成功处理流的通知。

# java.util.concurrent.Flow.Subscription

发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。这用于在PublisherSubscriber之间创建异步非阻塞连接Subscriber调用其请求方法以向Publisher获取新的data stream。它还具有取消方法以取消订阅,即关闭SubscriberPublisher之间的连接

代码演示:subscription变量需要保存其引用。以此在onNext方法中对Publisher进行请求数据流。count变量用于记录处理的次数,将在main thread中通知任务是否处理完成

// Employee 用户自己定义的类
public class MySubscriber implements Subscriber<Employee> {

  private Subscription subscription;
  private int count = 0;

  @Override
  public void onSubscribe(Subscription subscription) {

    System.out.println("Subscribed");
    this.subscription = subscription;
    this.subscription.request(1);
    System.out.println("onSubscribe requested 1 item");
  }

  @Override
  public void onNext(Employee item) {
    System.out.println("Process Employee" + count);
    count++;
    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    this.subscription.request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("some error happen" + throwable);
  }

  @Override
  public void onComplete() {
    System.out.println("all process done");
  }

  public int getCount() {
    return count;
  }

  public void setCount(int count) {
    this.count = count;
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# java.util.concurrent.Flow.Processor

此接口同时扩展了PublisherSubscriber,用于在PublisherSubscriber之间转换消息。可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了PublisherSubscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。

# Back Pressure

当发布者以比订阅者消耗的速度快得多的速度生成消息时,就会形成回压。 Flow API没有提供任何机制来发出有关背压的信号或进行处理。 但是我们可以设计自己的策略来处理它,例如微调用户或降低消息产生率。 SubmissionPublisher中提供了一个buffer的机制,允许Subsriber最大处理的量,超过该数量将被阻塞。Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256

# 一次完整的调用流程大概可以描述为

【1】订阅者向发布者发送订阅请求。
【2】发布者根据订阅请求生成令牌发送给订阅者。
【3】订阅者根据令牌向发布者发送请求N个数据。
【4】发送者根据订阅者的请求数量返回M(M<=N)个数据。
【5】重复3,4。
【6】数据发送完毕后由发布者发送给订阅者结束信号。

(adsbygoogle = window.adsbygoogle || []).push({});