Reactive stream 响应式流
- Reactive stream是jdk9新特性,提供了一套API,就是一种订阅发布者模式
- 被压,背压是指在异步场景中,发布者发送事件速度远快于订阅者的处理速度的情况下,一种告诉上游的发布者降低发送速度的策略,简而言之,背压就是一种流速控制的策略。
举个例子:假设以前是没有水龙头的,只能自来水厂主动的往用户输送水,但是不知道用户需要多少水,有了Reactive stream,就相当于有了水龙头,用户可以主动的请求用水,而自来水厂也知道了用户的需求
示例代码(需要jdk9以上版本的支持)
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Sub ion;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Sub ion sub ion;
@Override
public void onSubscribe(Sub ion sub ion) {
// 保存订阅关系, 需要用它来给发布者响应
this.sub ion = sub ion;
// 请求一个数据
this.sub ion.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println(\"接受到数据: \" + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.sub ion.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.sub ion.cancel();
}
@Override
public void (Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.sub ion.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println(\"处理完了!\");
}
};
// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);
// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 1000; i++) {
System.out.println(\"生成数据:\" + i);
// submit是个block方法
publiser.submit(i);
}
publiser.submit(111);
publiser.submit(222);
publiser.submit(333);
// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
继续阅读与本文标签相同的文章
上一篇 :
涉嫌虚假宣传 特斯拉在德被消费者保护机构控告
下一篇 :
golang通过cgo调用C++程序
-
《网安动态》多因子身份验证的五个趋势;账户安全的未来:一个
2026-05-18栏目: 教程
-
阿里云开发者社区问答栏目提问规范
2026-05-18栏目: 教程
-
小伙上演“无人驾驶”手舞足蹈,女网友拍视频发朋友圈结果“悲剧了”……
2026-05-18栏目: 教程
-
迎来“无后门协议”!持续压制无效?5G建设选择在于技术问题
2026-05-18栏目: 教程
-
Pepper Metrics - Spring/Spring Boot应用性能监控利器
2026-05-18栏目: 教程
