Reactive-Streams
1. Reactive-Streams 规范
Reactive-Streams 是由多家技术公司(包括 Lightbend、Netflix、Pivotal 等)联合发布的一套处理异步流式数据的标准。 其核心目标是定义一个兼容的、非阻塞的背压(Backpressure)处理模型,帮助开发者处理高速数据流中可能产生的压迫问题。
Reactive-Streams 规范主要针对以下几个问题:
- 异步数据流的处理:以非阻塞方式处理数据,保证资源高效使用。
- 背压处理:当消费者的处理速度低于生产者时,合理管理数据流的流量,避免系统崩溃。
- 跨框架兼容性:在不同响应式框架(如 Reactor、RxJava 等)之间实现互操作。

2. Reactive-Streams 的核心组件
Reactive-Streams 规范定义了四个核心接口

-
Publisher: 负责发布数据,它是数据源的一部分,向订阅者(Subscriber)发送数据。 通过 subscribe 方法,Publisher 可以向多个 Subscriber 注册,通知其数据流的到达. ```java public interface Publisher
{ void subscribe(Subscriber<? super T> subscriber);
}
* **Subscriber**: 数据的消费者,接收 **Publisher** 发布的数据流。**Subscriber** 需要实现四个方法,分别处理不同的状态变化.
```java
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 初始化时调用(接收到 Subscription 对象,订阅者可以通过它控制数据的请求和取消)
void onNext(T t); // 当有新数据到达时调用(每当有数据发布时,Publisher 会调用该方法)
void onError(Throwable t); // 当发生错误时调用(如果发生错误,onError 会被调用,终止数据流。)
void onComplete(); // 当数据流结束时调用(当所有数据发布完成时调用。)
}
-
Subscription: 是连接 Publisher 和 Subscriber 的纽带,它允许 Subscriber 控制数据流的数量。 背压机制就依赖于 Subscription 进行数据流量控制: ```java public interface Subscription {
void request(long n); // 请求 n 个数据元素
void cancel(); // 取消数据流
}
* Processor: 特殊的组件,它既是 Subscriber 也是 Publisher,充当中间处理器,允许在接收到数据后对其进行处理再发布给下游
```java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
// 既能订阅数据,也能发布处理后的数据
}
3. 背压机制(Backpressure)
背压是 Reactive-Streams 规范中的关键概念。它用于处理生产者发送数据过快(正压),而消费者无法及时处理的情况。 没有背压机制的系统很容易出现内存溢出或性能下降。
通过 Subscription 的 request(n) 方法,消费者可以根据自己的处理能力,向生产者请求合适数量的数据。 如果消费者处理不过来,它可以在没有请求更多数据之前停止接收。
4. Reactive-Streams 与 Reactor
Reactor 是 Spring 的响应式编程库,完全基于 Reactive-Streams 规范。它通过 Flux 和 Mono 两种 Publisher 来实现数据流的发布。
- Mono:表示一个包含 0 或 1 个数据的异步流。
- Flux:表示一个包含 0 到多个数据的异步流。
Reactor 的底层实现遵循了 Reactive-Streams 规范,并扩展了许多强大的操作符,用于流的转换、过滤、组合等操作。
5. Reactive-Streams的优势
Reactive-Streams 是构建响应式应用的基础,它提供了以下优势:
- 兼容性:由于 Reactive-Streams 是一个标准,不同的响应式库(如 Reactor 和 RxJava)可以无缝互操作。
- 非阻塞:避免了传统阻塞式 IO 模型中的性能瓶颈。
- 背压支持:通过背压机制,可以控制数据流量,防止消费者过载。
简洁的异步数据处理:通过标准化的接口和操作符,处理异步流数据变得更加简洁和直观。
能摸鱼就很舒服