响应式框架 Reactor 基础
Reactor 是由基于 Reactive Streams 技术构建的全异步类响应式框架。
概念
Reactor 认为,万物皆为数据,数据可以像 Stream 流一样,对数据做一系列流式处理来解决问题,而且它还对每一个流处理都实现全异步执行,因为它每个流处理过程都包含缓冲。
对Reactor来说,它只有单条数据,即 Mono,和多条数据,即 Flux,它们都是做为数据的载体,并通过载体对数据进行一次或多次的处理。
在每一次的数据处理中,都带有状态信号,即可感应到数据列的加工,完成,取消,异常,数据进出等状态。
使用
使用 Reactor 框架包,引入以下包,即可使用
1.引入 bom 管理
# 引入bom 管理
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2025.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
2.引入依赖
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
基础代码实现
我们可以通过 Mono 或 Flux 创建一个载体,用于存放要处理的数据:
1.使用 Mono 创建一个携带一条数据的载体
// 创建一个 Mono (即单条数据) 集成载体
Mono<Integer> mono = Mono.just(1);
// 我们可以用这个载体做各种链式操作,如 filter 过滤,过滤后给到订阅者进行输出
mono
.log() // 对上一条的链进行监控
.filter(element -> true) // 对上一条链进行过滤
.log() // 对被过滤后的数据链进行监控
.subscribe(System.out::println); // 提交给订阅者输出
2.使用Flux 创建一个携带多条数据的载体
Flux<Integer> flux = Flux.just(1,2,3,4,5,6,7,8);
flux
// 对多条数据进行处理:只收录大于3的数据作为下一个数据链
.filter(element->element>3)
.subscribe(System.out::println);
输出 4 5 6 7 8
状态信号
Reactor 带有很多在处理过程中的状态方法,通常使用 doOnXXXX 来表示处理数据链的状态
下面列出常见的状态回调方法
- doOnNext: 当数据(不带状态信号)到达的时候触发
- doOnEach: 当元素(流的数据和信号)到达的时触发
- doOnRequest: 消费者请求流元素时候触发
- doOnError: 流发生错误
- doOnSubscribe: 流被订阅的时候触发
- doOnTerminate: 发送取消、异常信号中断了流时触发
- doOnCancel: 流被取消时触发
- doOnDiscard: 流中元素被忽略的时候触发
SignalType 的类型
SignalType 类型是在整个状态中对于流的状态枚举,可以通过使用 来获得这个类型
- SUBSCRIBE: 被订阅
- REQUEST: 请求了N个元素
- CANCEL: 流被取消
- ON_SUBSCRIBE: 在订阅的时候
- ON_NEXT: 在元素到达
- ON_ERROR: 在流错误
- ON_COMPLETE: 在流正常完成时
- AFTER_TERMINATE: 中断以后
- CURRENT_CONTEXT: 当前上下文
- ON_CONTEXT: 感知上下文
代码实现
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
flux
.doOnSubscribe(subscription -> System.out.println("被订阅了" + subscription))
.doOnNext(integer -> System.out.println("到达数据:" + integer))
.doOnError(error -> System.out.println("发生错误了:" + error.getMessage()))
.doOnRequest(l -> System.out.println("请求流元素:" + l))
// 对多条数据进行处理:只收录大于3的数据作为下一个数据链
.filter(integer -> integer > 3)
.doOnNext(integer -> System.out.println("filter之后的到达数据:" + integer))
.subscribe(integer -> System.out.println("订阅者得到的数据 = " + integer));
注意:对于doOnXXX的事件方法,具有顺序意义,比如 doOnNext 放在经过 filter 之后的话,那么它只会检测到 filter 过的流元素,而且它的流元素是随着每一个加工而变化的。因此,上面的代码执行结果如下:
被订阅了reactor.core.publisher.FluxArray$ArrayConditionalSubscription@4690b489
请求流元素:9223372036854775807
到达数据:1
请求流元素:1
到达数据:2
请求流元素:1
到达数据:3
请求流元素:1
到达数据:4
filter之后的到达数据:4
订阅者得到的数据 = 4
到达数据:5
filter之后的到达数据:5
订阅者得到的数据 = 5
到达数据:6
filter之后的到达数据:6
订阅者得到的数据 = 6
到达数据:7
filter之后的到达数据:7
订阅者得到的数据 = 7
到达数据:8
filter之后的到达数据:8
订阅者得到的数据 = 8
官方提供的订阅者类 BaseSubscriber
使用自定义消费者,推荐直接使用 BaseSubscriber 的逻辑。使用 new BaseSubscriber<> 来创建一个订阅者
BaseSubscriber<>的生命周期
- hookOnSubscribe
- 当发布者成功与订阅者绑定时会回调的方法
- hookOnNext
- 当数据到达时回调的方法
- hookOnComplete
- 当处理完成了的回调方法
- hookOnError
- 当处理出现异常时的回调方法
- hookOnCancel
- 当处理间调用了 cancel() 方法时,会回调hookOnCancel方法
- hookFinally
- 不论处理正常完成,还是出现了异常,最终都会在结束时回调的方法
获取指定个订阅数 request(n)
当绑定完成后,订阅者可向发布者请求获取一个或多个任务回来处理。注意:获取的一次任务中,可能任务里有1个或多个数据。
request(2) 时,会一次获取2个任务数据回来处理,但每个处理顺序执行到 onNext() 中获得数据
获取所有订阅 requestUnbounded()
当绑定完成后,订阅者可向发布者请求获取所有任务
会一次获取所有任务数据回来处理,但每个处理顺序执行到 onNext() 中获得数据
发布者高级使用
缓冲限流 limitRate()
对比 request 和 requestUnbounded 的区别是,
request 是指定获取N个任务,任务中有M个数据。属于发布者一次把所有任务放到“任务篮子”中
而 requestUnbounded 是一次获取所有任务,任务中有M个数据,属于发布者一次把所有任务放到“任务篮子”中
limitRate 是对于发布者而言的,发布者限制每一次放到“任务篮子”中的数量,
假如 limitRate(10) 时,那么 发布者 即使有100个任务,也会限制每一次只放到“任务篮子”中10个,待订阅者订阅
订阅者会实时检测“任务篮子”中的剩余数量,若“任务篮子”中的任务数量已处理 75% 以上了,就会让 发布者 放入新的 75% 的任务量到篮子中(官方的预存策略)。
举例:
- 当【发布者】有 1000 个任务,使用 limitRate(100)
- 【发布者】暂把 100 个任务放到篮子中,待订阅者获取处理
- 【订阅者】开始获取任务个数,request 或 requestUnbounded
- 当篮子中剩下25个任务时(处理完成75%时)
- 【发布者】继续往篮子中放入75%的任务,即 1000*75% = 75 个
- 【订阅者】继续处理
- 如此类推
凑数分组订阅 buffer()
流中提供一个方法,可以让流中的数据,以多少个为一组,一次发送给订阅者进行处理。
比如流中的数据有 (1,2,3,4,5,6,7,8,9,10)
使用 buffer(3) 时,它会把它们分组成 [1,2,3],[4,5,6],[7,8,9],[10] 这4个数组,并逐一发给订阅者,而不再是分成10次发给订阅者了
订阅者收到的数据类型将为 List<> 数组类型。
bufferUntilChanged() 数据变化分组订阅
与 buffer 功能差不多,但是 buffer 只会按顺序的对数据进行分组,而 bufferUntilChanged 具有另一个功能,就是按照条件来判断是否需要分组,在一对多查询中比较常用。
bufferUntilChanged 提供两种方式
- 第一种:提供一个 Flux<> 的编历的函数方法
- 用来一个个数据去让用户加工,判把加工后的数据创建新的流作为新的 Flux<>
- 第二种:提供一个 Flux<> 的编历的函数方法,提供一个 上一个 数据 和 下一个 数据,用于判断 Boolean
- 除了第一种的操作外,还会提供上一个和下一个数据,用以判断这两个数据是否应该分为一组,如果返回 true 则分为一组,返回 false 则另外分组
如:
- book1{ group:1, name:书本1 },book2{ group:1, name:书本2 },book3{ group:2, name:书本3 }
- 使用 bufferUntilChanged 时,可以通过对比 group 来判断是否分为一组,因 book1 和 book2 的 group 都为1,所以会并成一组
- 效果:{ [book1, book2], [book3] }
注意:它只会拿上一个数据和下一个数据进行判断,所以分组时,数据不要零散,否则会分出很多零散组
如:
- book1{ group:1, name:书本1 }, book3{ group:2, name:书本3 }, book2{ group:1, name:书本2 }
- 它会 book1 和 book3 对比,结果不能分组
- book3 和 book2 对比,结果不能分组
- 最终结果:{ [book1], [book3], [book2] },即使 book1 和 book2 的 group 一样,因顺序不同,至使合组失败。
自定义同步创建序列数据 generate
通过代码的方式创建数据序列,但是这个方法是同步创建的。
使用 next() 方法加入一个数据到列中:(但是如果直接调用 next() 的话,只允许调用一次,否则会报异常)
Flux.generate(sink->{
// 通过 SynchronousSink 对象来创建一个序列数据
sink.next(1);
}).subscribe();
使用 next() 方法加入多个数据到列中:(需要先定义原始数据值,再进行 next() 加入数据,但是加入完一定要调用 complete)
Flux.generate(() -> 0, (state, sink) -> {
// ()->0 是定义 state 的原始数据值
// state 是原始数据,把 state 加入到序列中
sink.next(state);
// 当 需要停止加入序列时,使用 sink.complete() 进行停止加入
if (state > 10) sink.complete();
return state + 1;
})
.subscribe();
自定义异步创建序列数据 create
自定义处理加工创建序列数据 handler
就是在原有的数据基础上,通过自定义代码来为传来的数据进行加工成自己想要的样子,再加入到序列中,会使用同步方式处理
Flux.range(1,10)
.handle((element,sink)->{
// element 是取得的原始数
System.out.println("原始数据为:"+element);
// sink 是用于把处理后的数据加入到序列流中的,这时的流类型从 int 变为 string 了
sink.next("处理后的数据"+element);
})
.subscribe();
自定义线程调度 Schedulers
对于流的处理是在什么进程下进行的问题,官方给出了明确的指示,就是默认 使用当前线程执行一切的缓存和数据处理。当然我们可以设定【发布者】的发布线程和【订阅者】的订阅处理线程
线程变更可以有以下几种情况
- Schedulers.immediate():默认的线程调度,即永远使用当前线程执行
- Schedulers.single():开启一条单一的线程来负责处理这一序列
- Schedulers.boundedElastic():使用官方的线程调度来创建一定的线程来处理这一序列,线程数以 10*CPU 核心数来定,有限的线程池
- Schedulers.fromExecutor():使用用户自定义的 Ranner 线程处理
- Schedulers.newBoundedElastic():使用用户自定义的线程声明来创建对应的线程数来处理(即如果不想使用官方的 10*CPU数 定义的线程数可使用此方法)
- Schedulers.parallel():使用并发池线程来处理,与 boundedElastic 的区别是,parallel 是并发的,可能无限线程
- Schedulers.newParallel():使用用用创建的自定义并发池来处理
在发布者中,我们可以通过使用 publishOn() 方法来定义要用于【发布者】发布任务时所使用的线程调度类型
在发布者中,也可以使用 subscribeOn() 方法来定义用于【订阅者】在处理任务时所使用的线程调度类型
Flux.range(1,10)
// 设置发布者的线程调度类型
.publishOn(Schedulers.boundedElastic())
// 设置订阅者的线程调度类型
.subscribeOn(Schedulers.parallel())
.handle((element,sink)->{
// element 是取得的原始数
System.out.println("原始数据为:"+element);
// sink 是用于把处理后的数据加入到序列流中的,这时的流类型从 int 变为 string 了
sink.next("处理后的数据"+element);
})
.subscribe();
Flux 常规操作
block 取出数据
如果我们希望把Flux中的处理过的最终数据返回给我们,可以使用 block 进行取出
同时还有 blockFirst 取出第一个,blockLast 取出最后一个,或 collectList().block() 取出整个数组数据
List<Integer> block = Flux.range(1, 10)
.map(v -> v + 10)
.collectList() // 把数据转成一个 List 的 Mono 对象
.block(); // 在Mono 对象中取出 List 数据
filte 过滤
filte 过滤会提供一个值,要求返回一个 boolean,为true的会被收录到新流
// 在 元素中各取一个,并判断是否 %2 为0的,如果是就会加入到下一个新流中
Flux.just(1, 2, 3, 4).filter(v -> v % 2 == 0).subscribe();
flatMap 打散加工
flatMap 打散会提供一个值,并要求返回一个Flux值,它们会被记录并合并成一个新的流中
Flux.just("0-1", "1-2", "2-3", "3-4").flatMap(v->{
// 通过 "-" 对每个元素进行分割
String[] split = v.split("-");
// 把分割后的数据加到新流中
return Flux.fromArray(split);
}).subscribe();
// 效果则变为【“0”,"1","1","2","2","3","3","4"】
concatMap 连接加工
concatMap 连接加工会提供一个做,并要求返回一个Flux值,两个流整体并排合并,这与 merge 有一定的区别
Flux.just(1,2,3,4).concatMap(v->{
// 在原有的 1,2,3,4 基础上,并合 9,8,7
return Flux.just(9,8,7);
}).subscribe();
concat,concatWith 连接,或与xx连接
concat 是用于在方法体内连接多个流,合并为一个流
// 一次合并多个流到一个流中
Flux.concat(Mono.just("T"),Mono.just("Z"),Mono.just("M")).subscribe()
// 效果:【“T”,"Z",M】
concatWith 是在已有一个流的条件下,连接在别的流的后面,并合并为一个新的流
// 在已有的流的条件下,合并新的流,成为一个新的流
Flux.just("TZMing","1").concatWith(Flux.just("Jurrivh","2")).subscribe();
// 效果【“TZMing”,"1" ,"Jurrivh", "2"】
merge 和 mergeWith 合并
merge 与 concat 功能差不多,但有区别,concat 中对两个流进行并排合并(或叫连接),而merge 则是把两个流混合在一起,它取决于两个流的元素谁先取得,就谁先合并
Flux.merge(
// 第一个流每一秒放出一个
Flux.just("a","b","c").delayElements(Duration.ofSeconds(1)),
// 第二个流每两秒放出一个
Flux.just("d","e","f").delayElements(Duration.ofSeconds(2))
);
当他们的元素放出有时限时,此时它们会按照放出的元素的先后顺序来合并成一个新的流
效果:【"a","b","d","c","e","f"】 说明:第一秒:a出来,第二秒 b 和 d 出来,第三秒,c 出来,第四秒 e 出来,第五秒 无,第六秒 f 出来
mergeWith 是在存在一个流的条件下加入新的流进行合并成一个新的流。
mergeSequential 优先流合并
mergeSequential 会按照谁先推出第一个元素的流,则这个流将会被优先全部排在前面
比如:
- 流1【1,2,3】, 流2【4,5,6】流3【7,8,9】
- 当 流2【4】先出来,再到流1【1】,最后到流3【7】,那么流2的整个元素会排最前,其次是流1,最后流3
- 效果【4,5,6, 1,2,3 ,7,8,9】
zip 和 zipWith 压缩
zip 会与两个或多个流俩俩组合(目前最多只能一次合并8个流),并形成一个新的元组类型元素流,若有不能俩俩组合的多余元素,则被忽略。
假设:
- 流1【1,2,3】
- 流2【"a","b","c","d"】
- 效果【[1,"a"],[2,"b"],[3,"c"]】,因为 "d" 无法与流1中的元素组合,将被忽略
使用元组类型,当前有 Tuple2 到 Tuple8,可同时合并8组
Flux<Tuple2<Integer, String>> zip = Flux.zip(
Flux.just(1, 2, 3),
Flux.just("d", "e", "f")
);
transform 和 transformDeferred 流转换
transform 一次性提供了整个流数据,要求返回一个流数据
transform 是一个比较特殊的变换,在流中,除了包含了 流数据 以外,流中还包含一个【信号】值,这个信号值是包含,成功,异常,完成等信息
同样也包含了一个值用于记录【它已被订阅了几次】,transform对这个值做了运算。
- transform: 【订阅值】与其它订阅者不共享,每一个订阅者对发布者来都说是第一次订阅
- transformDeferred:【订阅值】与其它订阅者共享,当存在两个以上的订阅者时,它们之间的数据将会共享
代码说明:
// 创建一个原子整数,整数初始化值为0
AtomicInteger times = new AtomicInteger(0);
Flux<String> pub = Flux.just("a", "b", "c", "d").transform(v -> { // transform 时的
Flux<String> pub = Flux.just("a", "b", "c", "d").transformDeferred(v -> { // transformDeferred 时的
// 整数值先+1,再判断当前是否为1
if (times.incrementAndGet() == 1) {
// 如果是,则把所有元素中的数据转为大写
return v.map(String::toUpperCase);
} else {
// 如果不是,则原流返回
return v;
}
});
pub.subscribe(v -> System.out.println("订阅者1:" + v));
pub.subscribe(v -> System.out.println("订阅者2:" + v));
- 对于 transform 来说,订阅者1 与 订阅者2 不会被共享 times 的值,因此,订阅者1 被订阅时,times = 1,转大写生效,订阅者2 被订阅时,times = 1,转大写也生效
- 效果: 订阅者1->【“A”,“B”,“C”,“D”】,订阅者2->【“A”,“B”,“C”,“D”】
- 对于 transformDeferred 来说,,订阅者1 与 订阅者2 会共享 times 的值,因此,订阅者1 被订阅时,times = 1,转大写生效,订阅者2 被订阅时,times = 2,返回原流数据
- 效果:订阅者1->【“A”,“B”,“C”,“D”】,订阅者2->【“a”,“b”,“c”,“d”】
defaultIfEmpty 预存数据
检查流数据是否为空,如果是,则补数据
// 如果这个流来自一个不确定的方法中得到,而这个方法有可能返回一个空数据流时
public static Flux<String> createFlux(){
return Flux.empty();
}
// 如果获取到的流是空流时,预设一个流数据
createFlux().defaultIfEmpty("a").subscribe();
switchIfEmpty 预存流
与 defaultIfEmpty 类似,但是 defaultIfEmpty 是预存一个数据值,而 switchIfEmpty 则是预存一个流
// 如果这个流来自一个不确定的方法中得到,而这个方法有可能返回一个空数据流时
public static Flux<String> createFlux(){
return Flux.empty();
}
// 直接预创建一个流,而不是放入一个数据
createFlux().switchIfEmpty(Flux.just("a","b","c")).subscribe();
Context API 上下文
Context 是用于早先的流操作,获取后操作提供的数据
在所有支持 context 上下文的加工流操作中,都会提供 context 上下文对象,供我们在上游获取下游的数据
Flux.just(1, 2, 3)
// 支持提供 context 上下文的操作方法
.transformDeferredContextual((flux, context) -> {
// 上游获取了下游的数据
return flux.map(v -> v + (String) context.get("prefix"));
})
// 下游设置的数据
.contextWrite(Context.of("prefix", "data"))
.subscribe(System.out::println);
错误异常处理
对于在流中处理数据时出现的错误或异常,Flux 提供了多种解决错误和异常的方式
onErrorReturn
onErrorReturn 是指当在流中处理时发生了异常时,则返回一个自定义的数据给到流,但异常后面的数据将不会继续处理,并返回正常结束信号。
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorReturn(99) // 当处理到 10/0 时会捕获异常并返回 99 并返回正常结束信号
.subscribe(System.out::println,
err-> System.out.println("err = " + err),
()-> System.out.println("处理正常完成"));
结果【10 5 3 99 处理正常完成】
另外, onErrorReturn 可以支持特定的异常类型处理,即可以提供某种异常,只对抛出某种异常进行处理
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
// 只对产生 RuntimeException 时才会捕获
.onErrorReturn(RuntimeException.class,99)
.subscribe(System.out::println,
err-> System.out.println("err = " + err),
()-> System.out.println("处理正常完成"));
onErrorResume
onErrorResume 与 onErrorReturn 的区别在于,onErrorResume 是需要提供一个返回 Flux 流的数据,而不是提供一个自定义数据。
同样,onErrorResume 会吃掉异常,并停止异常之后的数据处理行为。
onErrorResume 即可以返回一个正常的流,也可以返回一个异常,用作记录
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorResume(err -> Flux.just(99)) // 需要创建一个新的流作为补偿异常的数据的补充
.subscribe(System.out::println,
err-> System.out.println("err = " + err),
()-> System.out.println("处理正常完成"));
当然我们可以在 onErrorResume 中创建一个自定义的异常,用于记录或特定需求
// 定义 自己的异常类
public static class OtherException extends RuntimeException{
public OtherException(String err){
super(err);
}
}
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorResume(err -> Flux.error(new OtherException(err.getMessage())))
.subscribe(System.out::println,
err-> System.out.println("err = " + err),
()-> System.out.println("处理正常完成"));
结果:
10
5
3
err = cn.unsoft.reactor.ReactorApplication$OtherException: / by zero
onErrorComplete
onErrorComplete 是对于一旦出现了异常,就直接不处理异常数据,吃掉异常并直接返回正常结束信号。
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorComplete()
.subscribe(System.out::println,
err-> System.out.println("err = " + err),
()-> System.out.println("处理正常完成"));
onErrorMap
onErrorMap 是允许用户重新包装一个新的异常,作为新的异常抛出,与 onErrorResume 的区别是, onErrorResume 需要返回由 Flux 封装的异常,而 onErrorMap 则可以直接提供异常的对象
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorMap(err->new OtherException(err.getMessage()))
.subscribe();
onErrorContinue
onErrorContinue 会对出现异常的数据进行抛异常输出,但是不会停止对数据的处理,而是继续往下执行。会在提供两个参数,分别为 error 异常对象,和当前出现处理错误的数据
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.onErrorContinue((err, value)->{
System.out.println("错误:"+ err.getMessage());
System.out.println("错误的数据:"+ value);
})
.subscribe(v-> System.out.println("v = " + v));
结果:
v = 10
v = 5
v = 3
错误:/ by zero
错误的数据:0
v = 2
onErrorStop
onErrorStop 是站在发布者的角度来看,一旦发现出现异常,直接不再发布后续的任务,若存在多个订阅者时,onErrorStop 会使所有的订阅者停止订阅行为
它与 onErrorComplete 的区别在于,onErrorComplete 会让单个订阅者产生完成信号,如果多个订阅者它们的处理进度不一至,onErrorComplete 不会影响其它订阅者的处理行为,而 onErrorStop 则会在其中一个订阅者出现异常后,通知所有订阅者停止处理,不管其它订阅者是否处理到出问题的数据上。
doFinally
和 try cache 一样,doFinally 就是不管是否出现错误,都会被执行的
扩展:doOnError
doOnError 是订阅者(或由发布者定义)的生命周期事件,它只是作为一个通知,与onError 不一样的地方是,onError 会吃掉异常并给出解决方案,而doOnError 只是一个通知,不会改变异常情况,并且,当处理过程出现异常时,除了能吃掉异常的onError 方法外,onError的报错和 doOnError 都会被执行通知,不会只执行其一。
超时与重试
timeout 超时
可以对任务的派发进行超时限制,限制在多少时间内让发布者派发任务,否则就会报异常。
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.timeout(Duration.ofSeconds(1)) // 如果使用延时派发,如 .delayElements(Duration.ofSeconds(2)) 则会引起超时
.onErrorComplete()
.subscribe(v-> System.out.println("v = " + v));
retry 重试
当出现派发任务获取不到时,可以使用 retry 来对任务重新获取测试,如果不填入次数,默认是永久重试
Flux.just(1, 2, 3, 0, 5)
.map(v -> 10 / v)
.delayElements(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(1))
.retry(2) // 设置重试2次
.onErrorComplete()
.subscribe(v-> System.out.println("v = " + v));
Sinks 在 Flux 中的消息管道控制
Sink 是控制【发布者】与【订阅者】之间的信息传播通道的作用,可以在这个管道中做多种行为管理。
在 Flux 和 Sub 订阅者之间,系统都会默认给我们创建一个管道作为它们之间的订阅消息传播渠道,我们可以自定义这个管道的一些属性
Sink 也叫接受器,数据管道,所有数据顺着这个管道往下走的,
Sinks.Many<Integer> integerMany =
// 使用Sinks 管道管理器创建一个管道
Sinks
// 使用【允许多播】的管道
.many()
// 使用【单播模式】
.unicast()
// 使用【背压模式】
.onBackpressureBuffer(new LinkedBlockingQueue<Integer>());
// 尝试在管道中新增一个数据
integerMany.tryEmitNext(1);
// 把管道转成 Flux 流数据(其实是根据管道信息来创建流对象)
Flux<Integer> flux = integerMany.asFlux();
Sinks.many.unicast 单播
Sinks 是用于管理消息管道的,所以它可以控制这个消息管道允许单个订阅者,还是允许多个订阅者
这个管道只能绑定单个订阅者,当出现两次订阅时,会出现异常
// 使用支持多播的管道创建一条单播管道
Sinks.many().unicast()
Sinks.many.multicast 多播
Sinks 是用于管理消息管道的,所以它可以控制这个消息管道允许单个订阅者,还是允许多个订阅者
这个管道能绑定多个订阅者
// 使用支持多播的管道创建一条多播管道
Sinks.many().multicast()
Sinks.many.replay 重放
这个管道能重放元素,是否给后来的订阅者把之前的元素依然发给它。默认使用 重放
对于后来订阅者来说,一些已被派发出去的任务,是否需要给后来订阅者
Sinks.many().replay()
cache 缓存
cache 缓存是针对后来订阅者的,默认会对所有后来订阅者进行重放所有已派发的任务。
假如,发布者需要发布10个任务,而在发布了5个任务时,后来订阅者才开始订阅
cache 是用于指定重放前几个任务给后来者的。
// 对前三个数据给后来订阅者重新派发
flux.cache(3);
共有 0 条评论