反应式编程的思想最近得到了广泛的流行。 在Java
平台上有流行的反应式库RxJava和Reactor。反应式流规范的出发点是提供一个带非阻塞负压( non-blocking backpressure ) 的异步流处理规范。反应式流规范的核心接口已经添加到了Java9
中的java.util.concurrent.Flow
类中。Flow
中包含了Flow.Publisher
、Flow.Subscriber
、Flow.Subscription
和 Flow.Processor
等 4 个核心接口。Java 9
还提供了SubmissionPublisher
作为Flow.Publisher
的一个实现。RxJava 2
和Reactor
都可以很方便的与Flow
类的核心接口进行互操作。 Reactive Streams API ( java.util.concurrent.Flow)
实现了异步非阻塞的流处理方式。
Reactive Streams
是一项非阻塞背压的异步流处理方式,因此他们有一组Publisher
和Subscriber
,Publisher
将数据流push
到Subscriber
,Subscriber
则将消费这些数据流,并通过backpressure
(个人理解为回压)来反馈Subscriber
消费时的压力,调节Publisher
生产的速度。
# Java 9 Flow API
Java 9 Flow API
实现了Reactive Streams
规范。Flow API
是Iterator
和Observer
模式的组合。Iterator
在pull
模型上工作,其中应用程序从源中拉出数据,而Observer
在push
模型上工作,并在将数据从源推到应用程序时进行处理。
Java 9 Flow API
订阅者可以在订阅发布者的同时请求N
个项目。 然后将项目从Publisher
推送到Subsriber
,直到没有其他项目可推送或出现一些错误为止。
其中所有的方法都是void
,因为所有的方法都是异步执行的。
# java.util.concurrent.Flow.Publisher
Publisher
函数式接口用于将数据流发送到Subscriber
,publisher
有两个方法用于发送数据,一个是submit
,一个是offer
。两个方法下面实际都是调用的doOffer
方法,所以,offer
方法提供了置顶延迟时间后丢弃的策略,而submit
是offer
的简单实现,是一致阻塞不丢弃。
// 绑定订阅者
public void subscribe(Subscriber<? super T> subscriber);
2
# java.util.concurrent.SubmissionPublisher
该接口在实现了publisher<T>
之外还实现了AutoCloseable
接口,因此具有异步提交数据流到当前的订阅者中直到连接被关闭功能。所以可以直接用try
块来进行资源的管理。发布者,通过使用Exceutor
框架提交reactive stream
数据到订阅者
代码演示:使用SubmissionPublisher
作为发布者示例,看一下响应流实现的测试程序。
// Employee 用户自己定义的类
public static void main(String[] args) throws InterruptedException {
//create publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
//register subscriber
MySubscriber subscriber = new MySubscriber();
publisher.subscribe(subscriber);
List<Employee> employees = xxxxx;
//publish items
employees.forEach(publisher::submit);
while (employees.size() != subscriber.getCount()) {
TimeUnit.SECONDS.sleep(5);
}
publisher.close();
System.out.println("exiting app");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# java.util.concurrent.Flow.Subscriber
Subscriber
则为消费处理Publisher
发送过来的数据流
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
2
3
4
【1】onSubscribe: 这是在Subscriber
服务器订阅Publisher
以接收消息时调用的第一种方法。 通常,我们调用subscription.request
开始从处理器接收具体数量的数据流。
【2】onNext: 从Publisher
处收到数据时,将调用此方法,这是我们在其中实现业务逻辑以处理data stream
,然后从Publisher
处请求更多数据的地方。
【3】onError: 当发生不可恢复的错误时,将调用此方法,我们可以使用此方法清理任务,例如关闭数据库连接。
【4】onComplete: 这类似于finally
方法,并且在Publisher
没有push
任何其他data stream
并且关闭Publisher
时被调用。 我们可以使用它发送成功处理流的通知。
# java.util.concurrent.Flow.Subscription
发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。这用于在Publisher
和Subscriber
之间创建异步非阻塞连接。Subscriber
调用其请求方法以向Publisher
获取新的data stream
。它还具有取消方法以取消订阅,即关闭Subscriber
和Publisher
之间的连接
代码演示:subscription
变量需要保存其引用。以此在onNext
方法中对Publisher
进行请求数据流。count
变量用于记录处理的次数,将在main thread
中通知任务是否处理完成
// Employee 用户自己定义的类
public class MySubscriber implements Subscriber<Employee> {
private Subscription subscription;
private int count = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1);
System.out.println("onSubscribe requested 1 item");
}
@Override
public void onNext(Employee item) {
System.out.println("Process Employee" + count);
count++;
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("some error happen" + throwable);
}
@Override
public void onComplete() {
System.out.println("all process done");
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# java.util.concurrent.Flow.Processor
此接口同时扩展了Publisher
和Subscriber
,用于在Publisher
和Subscriber
之间转换消息。可以通过处理器连接发布者、订阅者以及其他处理器。Processor
本身同时继承了Publisher
与Subscriber
接口,所以可以对元素进行处理转发。主要用于让数据从T
转换为R
。同时,由于Processor
本身也可以接入Processor
,所以Processor
可以组成链来对数据进行处理。
# Back Pressure
当发布者以比订阅者消耗的速度快得多的速度生成消息时,就会形成回压。 Flow API
没有提供任何机制来发出有关背压的信号或进行处理。 但是我们可以设计自己的策略来处理它,例如微调用户或降低消息产生率。 SubmissionPublisher
中提供了一个buffer
的机制,允许Subsriber
最大处理的量,超过该数量将被阻塞。Flow
中还包含了一个默认方法defaultBufferSize()
,用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256
。
# 一次完整的调用流程大概可以描述为
【1】订阅者向发布者发送订阅请求。
【2】发布者根据订阅请求生成令牌发送给订阅者。
【3】订阅者根据令牌向发布者发送请求N个数据。
【4】发送者根据订阅者的请求数量返回M(M<=N)个数据。
【5】重复3,4。
【6】数据发送完毕后由发布者发送给订阅者结束信号。