SpringBoot Reactive 响应式编程
思想概念
响应式编程与传统的命令式编程思想完全不一样,如果把命令式编程思想比作一个老板把工作推给员工做,那么响应式编程就是员工主动向老板获取工作来做。
如果把老板比作请求,员工比作处理器线程
那么命令式编程对于员工来说,老板有一个任务,就要推给一个员工做,老板需要等待员工做完后并响应,如果些时任务过多,员工少,就会造成员工忙不过来。
而响应式编程是,老板不管员工忙不忙过来,而是把任务推放在一个“任务篮子”中,由员工主动往“任务篮子”中取出任务并完成,完成后,员工会把结果放到自己的“完成篮子”,不会等待老板是否过来响应,接着就继续往“任务篮子”中取任务处理,如此累推。当老板空闲时,就会在“完成篮子”中拿到结果。
术语
响应式编程需要了解一些术语,以更好的描述
- 正压
- 指命令式编程,由任务推给处理器进行处理的行为,称为“正压”
- 背压
- 指响应式编程,由处理器主动获取任务进行处理的行为,称为“背压”
- 发布者 (Publisher)
- 指分派任务的一方,但并不会把任务具体的力分派给某一个处理器,而是投放到“缓冲区”等待“订阅者”获取任务
- 订阅者 (Subscriber)
- 指只接收任务并处理,处理完成后投放到缓冲区等待“发布者”获取结果的一方
- 处理器 (Processor)
- 指间于“发布者”与“订阅者”之间的,既具有“发布者”能力,也具有“订阅者”能力的一方,它通常用于解决需要多次处理的任务,像流程线中的其中一个加工工序,对于“处理器”来说,缓冲区既是上一个“发布者”的“完成篮子”,也是下一个“订阅者”的“任务篮子”。
- 缓冲区、消息队列
- 因为响应式编程,每一个“发布者”,“订阅者”,或是“处理器”都有自己的“篮子”,即用于存放需要处理的任务,或已处理完成的任务,它们之间不会直接通讯,相互独立工作,使得互相之间不存在“谁等待谁完成”的问题,每个人通过“篮子”来存放给对方的数据。
- 订阅关系
- 响应式不是事件机制,它不存在“广播”的想法,订阅关系是用于指定“发布者”和“订阅者”之间的关系,只有关联了的“订阅者”才能拿到“发布者”的任务。所以需要在代码中定义订阅关系。
图解说明
(正压与背压的区别)
说明:
1.正压时,Tomcat需要对每一个任务分配一条线程进行处理,当生产者数据量巨大时,Tomcat无法分配大量线程进行处理,系统会有压垮的可能。
2.背压时,Tomcat会拿出一条线程作为前台接收生产者请求,收到请求后,会把请求数据放到缓冲区,不等待其它线程是否处理完毕。而其它线程会向缓冲区逐一拿到任务一个个来处理,处理完后,放到自己的缓冲区,待前台线程取出并响应。当生产者数据量巨大时,前台缓冲区会把数据都放在一起,让其它线程根据自身性能条件逐一处理,并不会出现压垮的情况。
(命令式编程与响应式编程的区别)
1.上部分表示命令式编程,假设有4个线程,但同时来了100个请求,那么每一个线程就要同时处理25个请求,而单个线程并非顺序的处理这25个请求,而是在同一时间周期内同步的处理,使得线程会不断地在这25个请求代码中来回切换,既浪费内存也浪费时间。
2.下部分表示响应式编程,假设有4个线程,但同时来了100个请求,响应式编程会使用一个线程作为管理者,管理这100个请求,剩下的3个线程向管理线程逐一获取请求顺序处理,处理完再获取。解决线程来回切换,且同时开启多份内存开支等问题。
(响应式编程的缓冲区作用)
1.上部分表示传统的命令式编程,来200个请求就得开启200个线程(如果可以的话),大量浪费线程
2.下部分表示响应式编程的处理模式,假设线程向“任务缓冲区”获得一条请求并处理,在处理完成后,把结果放到“完成缓冲区”中,不等待接着继续往“任务缓冲区”中获取任务并处理,处理完成后继续放到“完成缓冲区”中,缓冲区可同时存放多个任务数据。
(Reactive Stream整体流程)
1.上部分:发布者与订阅者绑定订阅关系,发布者发布数据到缓冲区,订阅者向发布者请求获得数据并处理,处现完成后放到缓冲区等待发布者验收。
2.下部分:当发布者的数据需要多个工序进行处理时,就出现了“处理器”对象,通过发布者-操作1绑定,操作1-操作2绑定,操作2-操作3绑定,操作3-订阅者绑定的方式,形成一条职责链,对数据进行多方加工。同时,每一个绑定关系中,都有独立之间的缓冲区,用于它们之间的任务与完成操作的异步操作。
代码示例
我们需要准备4个对象,这4个对象都是接口,需要我们来定义【发布者】、【订阅者】、【处理器】及【绑定关系】
创建发布者
Flow.Publisher publisher = new Flow.Publisher() {
// 发布者收到订阅者时的操作
@Override
public void subscribe(Flow.Subscriber subscriber) {
}
};
我们可以使用实现类来创建发布者
// 创建 SubmissionPublisher 发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher(); // 需要定义泛型,用以声明发布任务的数据类型
创建订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() { // 需要定义泛型,用以声明处理数据的类型
private Flow.Subscription subscription;
/**
* 订阅者,当订阅绑定成功后的方法
* @param subscription 订阅关系对象
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系
this.subscription = subscription;
// 向发布者获取一个任务
this.subscription.request(1);
}
/**
* 任务到达时
* @param item 任务传来的数据
*/
@Override
public void onNext(String item) {
// 收到任务时进行处理
System.out.println(item);
// 处理完上一任务后,接着获取下一个任务
this.subscription.request(1);
}
/**
* 当发布者或处理任务时抛出异常,这里也会感应得到
* @param throwable the exception
*/
@Override
public void onError(Throwable throwable) {
System.out.println("错误" + throwable.getMessage());
}
/**
* 当任务处理完时时
*/
@Override
public void onComplete() {
System.out.println("任务处理完成");
}
};
绑定订阅关系
// 发布者绑定订阅者关系
publisher.subscribe(subscriber);
发布者提交任务(需要先绑定)
publisher.submit("要处理的数据,根据泛型传数据值");
存在处理器的多层工序订阅关系
如果在一个响应式流中不止有发布者和订阅者的处理,还需要在中间添加处理器处理对象,处理器既是上家的订阅者,也是下家的发布者,同时具有发布者和订阅者的能力。
/**
* 创建一个【处理器】类,因为它既具有发布者,也具有订阅者,所以我们继承 SubmissionPublisher 类来充当发布者进行发布任务
* 而订阅者逻辑我们自己写
*/
public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
private Flow.Subscriber subscriber;
/**
* 订阅者,当订阅绑定成功后的方法
*
* @param subscription 订阅关系对象
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系
this.subscription = subscription;
// 向发布者获取一个任务
this.subscription.request(1);
}
/**
* 任务到达时
*
* @param item 任务传来的数据
*/
@Override
public void onNext(String item) {
// 收到任务时进行处理
System.out.println(item);
// 处理器作为发布者,发布处理完的数据作为下一个处理器的任务
submit(item);
// 处理器作为订阅者,处理完上一任务后,接着获取下一个任务
this.subscription.request(1);
}
/**
* 当发布者或处理任务时抛出异常,这里也会感应得到
*
* @param throwable the exception
*/
@Override
public void onError(Throwable throwable) {
System.out.println("错误" + throwable.getMessage());
}
/**
* 当任务处理完时时
*/
@Override
public void onComplete() {
System.out.println("任务处理完成");
// 作为发布者,处理完成后关闭线程
close();
}
}
其中一个接口方法:
public void subscribe(Subscriber<? super T> subscriber);
这个是给你一个成功绑定关系的订阅者对象,让你去处理怎么把这个对象进行保存并操作发布功能,是比较复杂的,因此我们只要用现有的实现类就可以了。
处理器之间的互想绑定关系
Flow.Processor<String, String> processor1 = new MyProcessor(); // 创建处理器1
Flow.Processor<String, String> processor2 = new MyProcessor(); // 创建处理器2
Flow.Processor<String, String> processor3 = new MyProcessor(); // 创建处理器3
// 发布者绑定与处理器的订阅关系
publisher.subscribe(processor1); // 发布者绑定处理器1
processor1.subscribe(processor2); // 处理器1绑定处理器2
processor2.subscribe(processor3); // 处理器2绑定处理器3
// 处理器3绑定订阅者
processor3.subscribe(subscriber);
// 发布者发布任务
publisher.submit("aaa");




共有 0 条评论