flyEn'blog

Spring5之使用Spring WebFlux开发响应式应用

本文参考:https://blog.51cto.com/liukang/2090163

Spring5之使用Spring WebFlux开发响应式应用。

1.lambda与函数式

2.Reactor 3响应式编程库

3.Spring Webflux和Spring Data Reactive

Reactor 3 响应式编程库

👉Reactor3参考文档

Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

我们先把练习用的项目搭起来。先创建一个maven项目,然后添加依赖:

1
2
3
4
5
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>

最新版本可到http://search.maven.org查询,复制过来即可。另外出于测试的需要,添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

Flux和Mono

Reactor中的发布者(Publisher)由Flux和Mono两个类定义,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果。

既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

下图所示就是一个Flux类型的数据流,黑色箭头是时间轴。它连续发出“1” - “6”共6个元素值,以及一个完成信号(图中⑥后边的加粗竖线来表示),完成信号告知订阅者数据流已经结束。

Flux.just(1, 2, 3, 4, 5, 6)

image-20200528112044589

下图所示是一个Mono类型的数据流,它发出一个元素值后,又发出一个完成信号。

Mono.just(1)

image-20200528112126526

既然Flux具有发布一个数据元素的能力,为什么还要专门定义一个Mono类呢?举个例子,一个HTTP请求产生一个响应,所以对其进行“count”操作是没有多大意义的。表示这样一个结果的话,应该用Mono<HttpResponse>而不是 Flux<HttpResponse>,对应的操作通常只用于处理 0/1 个元素。它们从语义上就原生包含着元素个数的信息,从而避免了对Mono对象进行多元素场景下的处理。

有些操作可以改变基数,从而需要切换类型。比如,count操作用于Flux,但是操作返回的结果是Mono<Long>

对于Flux,还可以通过如下方式声明(分别基于数组、集合和Stream生成):

1
2
3
4
5
6
Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);

三种信号都不是一定要具备的:

  • 首先,错误信号和完成信号都是终止信号,二者不可能同时共存
  • 如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流
  • 如果没有错误信号和完成信号,那么就是一个无限数据流

    比如,对于只有完成/错误信号的数据流:

1
2
3
4
5
6
7
8
// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());
// 只有错误信号的数据流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));

订阅前什么都不会发生

数据流有了,假设我们想把每个数据元素原封不动地打印出来:

1
2
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);   // 输出 123456
Mono.just(1).subscribe(System.out::println); // 输出 1

可见,subscribe方法中的lambda表达式作用在了每一个数据元素上。此外,Flux和Mono还提供了多个subscribe方法的变体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

比如订阅上面声明的Flux:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Flux.just(1, 2, 3, 4, 5, 6).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);

// 输出:
1
2
3
4
5
6
Completed!

// 举一个有错误信号的例子:
Mono.error(new Exception("some error")).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);
// 输出:
java.lang.Exception: some error

这里需要注意的一点是,Flux.just(1, 2, 3, 4, 5, 6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流。所以,订阅前什么都不会发生

测试与调试

从命令式和同步式编程切换到响应式和异步式编程有时候是令人生畏的。学习曲线中最陡峭的地方就是出错时如何分析和调试

在命令式世界,调试通常都是非常直观的:直接看 stack trace 就可以找到问题出现的位置, 以及其他信息:是否问题责任全部出在你自己的代码?问题是不是发生在某些库代码?如果是, 那你的哪部分代码调用了库,是不是传参不合适导致的问题?等等。

当你切换到响应式的异步代码,事情就变得复杂的多了。不过我们先不接触过于复杂的内容,先了解一个基本的单元测试工具——StepVerifier

最常见的测试 Reactor 序列的场景就是定义一个 Flux 或 Mono,然后在订阅它的时候测试它的行为。

当你的测试关注于每一个数据元素的时候,就非常贴近使用 StepVerifier 的测试场景: 下一个期望的数据或信号是什么?你是否期望使用 Flux 来发出某一个特别的值?或者是否接下来 300ms 什么都不做?——所有这些都可以使用 StepVerifier API 来表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Flux<Integer> generateFluxFrom1To6() {
return Flux.just(1, 2, 3, 4, 5, 6);
}
private Mono<Integer> generateMonoWithError() {
return Mono.error(new Exception("some error"));
}
@Test
public void testViaStepVerifier() {
StepVerifier.create(generateFluxFrom1To6())
.expectNext(1, 2, 3, 4, 5, 6)
.expectComplete()
.verify();
StepVerifier.create(generateMonoWithError())
.expectErrorMessage("some error")
.verify();
}

其中,expectNext用于测试下一个期望的数据元素,expectErrorMessage用于校验下一个元素是否为错误信号,expectComplete用于测试下一个元素是否为完成信号。

操作符(Operator)

通常情况下,我们需要对源发布者发出的原始数据流进行多个阶段的处理,并最终得到我们需要的数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐由原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于源发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个一个的操作符(Operator)。

1)map-元素映射为新元素

image-20200528114457072

上图是Flux的map操作示意图,上方的箭头是原始序列的时间轴,下方的箭头是经过map处理后的数据序列时间轴。

map接受一个Function的函数式接口为参数,这个函数式的作用是定义转换操作的策略。举例说明:

1
2
3
4
StepVerifier.create(Flux.range(1, 6)    // 1
.map(i -> i * i)) // 2
.expectNext(1, 4, 9, 16, 25, 36) //3
.expectComplete(); // 4
  1. Flux.range(1, 6)用于生成从“1”开始的,自增为1的“6”个整型数据;
  2. map接受lambdai -> i * i为参数,表示对每个数据进行平方;
  3. 验证新的序列的数据;
  4. verifyComplete()相当于expectComplete().verify()

2)flatMap-元素映射成流

flatMap操作可以将每个数据元素转换/映射为一个流,然后将这些流合并为一个大的数据流。

image-20200528114910438

注意到,流的合并是异步的,先来先到,并非是严格按照原始序列的顺序(如图蓝色和红色方块是交叉的)。

flatMap也是接收一个Function的函数式接口为参数,这个函数式的输入为一个T类型数据值,对于Flux来说输出可以是Flux和Mono,对于Mono来说输出只能是Mono。举例说明:

1
2
3
4
5
6
7
StepVerifier.create(
Flux.just("flux", "mono")
.flatMap(s -> Flux.fromArray(s.split("\\s*")) // 1
.delayElements(Duration.ofMillis(100))) // 2
.doOnNext(System.out::print)) // 3
.expectNextCount(8) // 4
.verifyComplete();
  1. 对于每一个字符串s,将其拆分为包含一个字符的字符串流;
  2. 对每个元素延迟100ms;
  3. 对每个元素进行打印(注doOnNext方法是“偷窥式”的方法,不会消费数据流);
  4. 验证是否发出了8个元素。

3)filter-过滤

filter操作可以对数据元素进行筛选。

image-20200528115452439

filter接受一个Predicate的函数式接口为参数,这个函数式的作用是进行判断并返回boolean。举例说明:

1
2
3
4
5
StepVerifier.create(Flux.range(1, 6)
.filter(i -> i % 2 == 1) // 1
.map(i -> i * i))
.expectNext(1, 9, 25) // 2
.verifyComplete();
  1. filter的lambda参数表示过滤操作将保留奇数;
  2. 验证仅得到奇数的平方。

4)zip-一对一合并

看到zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来。zip有多个方法变体,我们介绍一个最常见的二合一的。

image-20200528115732342

它对两个Flux/Mono流每次各取一个元素,合并为一个二元组(Tuple2)。

举个例子,假设我们有一个关于zip方法的说明:“Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.”,我们希望将这句话拆分为一个一个的单词并以每200ms一个的速度发出,除了前面flatMap的例子中用到的delayElements,可以如下操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Flux<String> getZipDescFlux() {
String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
return Flux.fromArray(desc.split("\\s+")); // 1
}

@Test
public void testSimpleOperators() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1); // 2
Flux.zip(
getZipDescFlux(),
Flux.interval(Duration.ofMillis(200))) // 3
.subscribe(t -> System.out.println(t.getT1()), null, countDownLatch::countDown); // 4
countDownLatch.await(10, TimeUnit.SECONDS); // 5
}

将英文说明用空格拆分为字符串流; 定义一个CountDownLatch,初始为1,则会等待执行1次countDown方法后结束,不使用它的话,测试方法所在的线程会直接返回而不会等待数据流发出完毕; 使用Flux.interval声明一个每200ms发出一个元素的long数据流;因为zip操作是一对一的,故而将其与字符串流zip之后,字符串流也将具有同样的速度; zip之后的流中元素类型为Tuple2,使用getT1方法拿到字符串流的元素;定义完成信号的处理为countDown; countDownLatch.await(10, TimeUnit.SECONDS)会等待countDown倒数至0,最多等待10秒钟。

除了zip静态方法之外,还有zipWith等非静态方法,效果与之类似:

1
getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))

在异步条件下,数据流的流速不同,使用zip能够一对一地将两个或多个数据流的元素对齐发出。

5)更多

Reactor中提供了非常丰富的操作符,除了以上几个常见的,还有:

  • 用于编程方式自定义生成数据流的creategenerate等及其变体方法;
  • 用于“无副作用的peek”场景的doOnNextdoOnErrordoOncompletedoOnSubscribedoOnCancel等及其变体方法;
  • 用于数据流转换的whenand/ormergeconcatcollectcountrepeat等及其变体方法;
  • 用于过滤/拣选的takefirstlastsampleskiplimitRequest等及其变体方法;
  • 用于错误处理的timeoutonErrorReturnonErrorResumedoFinallyretryWhen等及其变体方法;
  • 用于分批的windowbuffergroup等及其变体方法;
  • 用于线程调度的publishOnsubscribeOn方法。

使用这些操作符,你几乎可以搭建出能够进行任何业务需求的数据处理管道/流水线。

详细可阅读Reactor3参考文档

调度器与线程模型

在Reactor中,对于多线程并发调度的处理变得异常简单。

在以往的多线程开发场景中,我们通常使用Executors工具类来创建线程池,通常有如下四种类型:

  • newCachedThreadPool创建一个弹性大小缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;
  • newFixedThreadPool创建一个大小固定的线程池,可控制线程最大并发数,超出的线程会在队列中等待;
  • newScheduledThreadPool创建一个大小固定的线程池,支持定时及周期性的任务执行;
  • newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

此外,newWorkStealingPool还可以创建支持work-stealing的线程池。

说良心话,Java提供的Executors工具类使得我们对ExecutorService使用已经非常得心应手了。BUT~ Reactor让线程管理和任务调度更加“傻瓜”——调度器(Scheduler)帮助我们搞定这件事。Scheduler是一个拥有多个实现类的抽象接口。Schedulers类提供的静态方法可搭建以下几种线程执行环境:

  • 当前线程(Schedulers.immediate());
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle()
  • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
  • 固定大小线程池(Schedulers.parallel()),所创建线程池的大小与CPU个数等同;
  • 自定义线程池(Schedulers.fromExecutorService(ExecutorService))基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。

Schedulers类已经预先创建了几种常用的线程池:使用single()elastic()parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()newElastic()newParallel()方法。

Executors提供的几种线程池在Reactor中都支持:

  • Schedulers.single()Schedulers.newSingle()对应Executors.newSingleThreadExecutor()
  • Schedulers.elastic()Schedulers.newElastic()对应Executors.newCachedThreadPool()
  • Schedulers.parallel()Schedulers.newParallel()对应Executors.newFixedThreadPool()

举例:将同步的阻塞调用变为异步的

前面介绍到Schedulers.elastic()能够方便地给一个阻塞的任务分配专门的线程,从而不会妨碍其他任务和资源。我们就可以利用这一点将一个同步阻塞的调用调度到一个自己的线程中,并利用订阅机制,待调用结束后异步返回。

假设我们有一个同步阻塞的调用方法:

1
2
3
4
5
6
7
8
private String getStringSync() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Reactor!";
}

正常情况下,调用这个方法会被阻塞2秒钟,然后同步地返回结果。我们借助elastic调度器将其变为异步,由于是异步的,为了保证测试方法所在的线程能够等待结果的返回,我们使用CountDownLatch

1
2
3
4
5
6
7
8
@Test
public void testSyncToAsync() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(() -> getStringSync()) // 1
.subscribeOn(Schedulers.elastic()) // 2
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}
  1. 使用fromCallable声明一个基于Callable的Mono;
  2. 使用subscribeOn将任务调度到Schedulers内置的弹性线程池执行,弹性线程池会为Callable的执行任务分配一个单独的线程。

切换调度器的操作符

Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:publishOnsubscribeOn。它们都接受一个 Scheduler作为参数,从而可以改变调度器。但是publishOn在链中出现的位置是有讲究的,而subscribeOn 则无所谓。

image-20200528141933004

1
2
3
4
5
6
// 假设与上图对应的代码是:
Flux.range(1, 1000)
.map(…)
.publishOn(Schedulers.elastic()).filter(…)
.publishOn(Schedulers.parallel()).flatMap(…)
.subscribeOn(Schedulers.single())
  • 如图所示,publishOn会影响链中其后的操作符,比如第一个publishOn调整调度器为elastic,则filter的处理操作是在弹性线程池中执行的;同理,flatMap是执行在固定大小的parallel线程池中的;
  • subscribeOn无论出现在什么位置,都只影响源头的执行环境,也就是range方法是执行在单线程中的,直至被第一个publishOn切换调度器之前,所以range后的map也在单线程中执行。

错误处理

在响应式流中,错误(error)是终止信号。当有错误发生时,它会导致流序列停止,并且错误信号会沿着操作链条向下传递,直至遇到subscribe中的错误处理方法。这样的错误还是应该在应用层面解决的。否则,你可能会将错误信息显示在用户界面,或者通过某个REST endpoint发出。所以还是建议在subscribe时通过错误处理方法妥善解决错误。

1
2
3
4
5
6
7
@Test
public void testErrorHandling() {
Flux.range(1, 6)
.map(i -> 10/(i-3)) // 1
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
}

当i为3时会导致异常。

输出为:

1
2
3
25
100
java.lang.ArithmeticException: / by zero //注:这一行是红色,表示标准错误输出

subscribe方法的第二个参数定义了对错误信号的处理,从而测试方法exit为0(即正常退出),可见错误没有蔓延出去。不过这还不够~

此外,Reactor还提供了其他的用于在链中处理错误的操作符(error-handling operators),使得对于错误信号的处理更加及时,处理方式更加多样化。

在讨论错误处理操作符的时候,我们借助命令式编程风格的 try 代码块来作比较。我们都很熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括如下几种:

  1. 捕获并返回一个静态的缺省值。
  2. 捕获并执行一个异常处理方法或动态计算一个候补值来顶替。
  3. 捕获,并再包装为某一个 业务相关的异常,然后再抛出业务异常。
  4. 捕获,记录错误日志,然后继续抛出。
  5. 使用 finally 来清理资源,或使用 Java 7 引入的 “try-with-resource”。

以上所有这些在 Reactor 都有相应的基于 error-handling 操作符处理方式。

1. 捕获并返回一个静态的缺省值。

onErrorReturn方法能够在收到错误信号的时候提供一个缺省值:

1
2
3
4
5
6
Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorReturn(0) // 1
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
// 当发生异常时提供一个缺省值0

2. 捕获并执行一个异常处理方法或计算一个候补值来顶替

onErrorResume方法能够在收到错误信号的时候提供一个新的数据流:

1
2
3
4
5
6
7
8
9
10
11
12
Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorResume(e -> Mono.just(new Random().nextInt(6))) // 提供新的数据流
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);

// 举一个更有业务含义的例子:
// 1. 调用外部服务。
// 2. 如果外部服务异常,则从缓存中取值代替。
Flux.just(endpoint1, endpoint2)
.flatMap(k -> callExternalService(k)) // 1
.onErrorResume(e -> getFromCache(k)); // 2

3. 捕获,并再包装为某一个业务相关的异常,然后再抛出业务异常

有时候,我们收到异常后并不想立即处理,而是会包装成一个业务相关的异常交给后续的逻辑处理,可以使用onErrorMap方法:

1
2
3
4
5
6
7
8
9
10
Flux.just("timeout1")
.flatMap(k -> callExternalService(k)) // 1
.onErrorMap(original -> new BusinessException("SLA exceeded", original)); // 2
// 1. 调用外部服务
// 2. 如果外部服务异常,将其包装为业务相关的异常后再次抛出
// 相当于onErrorResume如下实现:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("SLA exceeded", original));

4. 捕获,记录错误日志,然后继续抛出

如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用doOnError 方法。前面提到,形如doOnXxx是只读的,对数据流不会造成影响:

1
2
3
4
5
6
Flux.just(endpoint1, endpoint2)
.flatMap(k -> callExternalService(k))
.doOnError(e -> { // 1
log("uh oh, falling back, service failed for key " + k); // 2
})
.onErrorResume(e -> getFromCache(k));

5. 使用 finally 来清理资源,或使用 Java 7 引入的 “try-with-resource”

1
2
3
4
5
Flux.using(
() -> getResource(), // 1
resource -> Flux.just(resource.getAll()), // 2
MyResource::clean // 3
);
  1. 第一个参数获取资源;
  2. 第二个参数利用资源生成数据流;
  3. 第三个参数最终清理资源。

另一方面, doFinally在序列终止(无论是 onComplete、onError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消),以便进行针对性的清理。如:

1
2
3
4
5
6
7
8
9
LongAdder statsCancel = new LongAdder();    // 1

Flux<String> flux =
Flux.just("foo", "bar")
.doFinally(type -> {
if (type == SignalType.CANCEL) // 2
statsCancel.increment(); // 3
})
.take(1); //
  1. LongAdder进行统计;
  2. doFinallySignalType检查了终止信号的类型;
  3. 如果是取消,那么统计数据自增;
  4. take(1)能够在发出1个元素后取消流。

重试

还有一个用于错误处理的操作符你可能会用到,就是retry,见文知意,用它可以对出现错误的序列进行重试。

请注意:retry对于上游Flux是采取的重订阅(re-subscribing)的方式,因此重试之后实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的。举例如下:

1
2
3
4
5
Flux.range(1, 6)
.map(i -> 10 / (3 - i))
.retry(1)
.subscribe(System.out::println, System.err::println);
Thread.sleep(100); // 确保序列执行完

输出如下:

1
2
3
4
5
5
10
5
10
java.lang.ArithmeticException: / by zero

可见,retry不过是再一次从新订阅了原始的数据流,从1开始。第二次,由于异常再次出现,便将异常传递到下游了。

回压

前边的例子并没有进行流量控制,也就是,当执行.subscribe(System.out::println)这样的订阅的时候,直接发起了一个无限的请求(unbounded request),就是对于数据流中的元素无论快慢都“照单全收”。

subscribe方法还有一个变体:

1
2
// 接收一个Subscriber为参数,该Subscriber可以进行更加灵活的定义
subscribe(Subscriber subscriber)

注:其实这才是subscribe方法本尊,前边介绍到的可以接收0~4个函数式接口为参数的subscribe最终都是拼装为这个方法,所以按理说前边的subscribe方法才是“变体”。

我们可以通过自定义具有流量控制能力的Subscriber进行订阅。Reactor提供了一个BaseSubscriber,我们可以通过扩展它来定义自己的Subscriber。

假设,我们现在有一个非常快的Publisher——Flux.range(1, 6),然后自定义一个每秒处理一个数据元素的慢的Subscriber,Subscriber就需要通过request(n)的方法来告知上游它的需求速度。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void testBackpressure() {
Flux.range(1, 6) // 1
.doOnRequest(n -> System.out.println("Request " + n + " values...")) // 2
.subscribe(new BaseSubscriber<Integer>() { // 3
@Override
protected void hookOnSubscribe(Subscription subscription) { // 4
System.out.println("Subscribed and make a request...");
request(1); // 5
}

@Override
protected void hookOnNext(Integer value) { // 6
try {
TimeUnit.SECONDS.sleep(1); // 7
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Get value [" + value + "]"); // 8
request(1); // 9
}
});
}
  1. Flux.range是一个快的Publisher;
  2. 在每次request的时候打印request个数;
  3. 通过重写BaseSubscriber的方法来自定义Subscriber;
  4. hookOnSubscribe定义在订阅的时候执行的操作;
  5. 订阅时首先向上游请求1个元素;
  6. hookOnNext定义每次在收到一个元素的时候的操作;
  7. sleep 1秒钟来模拟慢的Subscriber;
  8. 打印收到的元素;
  9. 每次处理完1个元素后再请求1个。

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Subscribed and make a request...
Request 1 values...
Get value [1]
Request 1 values...
Get value [2]
Request 1 values...
Get value [3]
Request 1 values...
Get value [4]
Request 1 values...
Get value [5]
Request 1 values...
Get value [6]
Request 1 values...

这6个元素是以每秒1个的速度被处理的。由此可见range方法生成的Flux采用的是缓存的回压策略,能够缓存下游暂时来不及处理的元素。

总结

以上关于Reactor的介绍主要是概念层面和使用层面的介绍,不过应该也足以应对常见的业务环境了。

从命令式编程到响应式编程的切换并不是一件容易的事,需要一个适应的过程。不过相信你通过本节的了解和实操,已经可以体会到使用Reactor编程的一些特点

  • 相对于传统的基于回调和Future的异步开发方式,响应式编程更加具有可编排性和可读性,配合lambda表达式,代码更加简洁,处理逻辑的表达就像装配“流水线”,适用于对数据流的处理;
  • 在订阅(subscribe)时才触发数据流,这种数据流叫做“冷”数据流,就像插座插上电器才会有电流一样,还有一种数据流不管是否有订阅者订阅它都会一直发出数据,称之为“热”数据流,Reactor中几乎都是“冷”数据流;
  • 调度器对线程管理进行更高层次的抽象,使得我们可以非常容易地切换线程执行环境;
  • 灵活的错误处理机制有利于编写健壮的程序;
  • “回压”机制使得订阅者可以无限接受数据并让它的源头“满负荷”推送所有的数据,也可以通过使用request方法来告知源头它一次最多能够处理 n 个元素,从而将“推送”模式转换为“推送+拉取”混合的模式。 后续随着对Reactor的了解我们还会逐渐了解它更多的好玩又好用的特性。

Reactor的开发者中也有来自RxJava的大牛,因此Reactor中甚至许多方法名都是来自RxJava的API的,学习了Reactor之后,很轻松就可以上手Rx家族的库了。

Spring WebFlux

Spring WebFlux是随Spring 5推出的响应式Web框架。

1)服务端技术栈

Spring提供了完整的支持响应式的服务端技术栈。

如上图所示,左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈,

  • Spring WebFlux是基于响应式流的,因此可以用来建立异步的、非阻塞的、事件驱动的服务。它采用Reactor作为首选的响应式流的实现库,不过也提供了对RxJava的支持。
  • 由于响应式编程的特性,Spring WebFlux和Reactor底层需要支持异步的运行环境,比如Netty和Undertow;也可以运行在支持异步I/O的Servlet 3.1的容器之上,比如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。
  • 从图的纵向上看,spring-webflux上层支持两种开发模式:
    • 类似于Spring WebMVC的基于注解(@Controller@RequestMapping)的开发模式;
    • Java 8 lambda 风格的函数式开发模式。
  • Spring WebFlux也支持响应式的Websocket服务端开发。

由此看来,Spring WebFlux与Vert.x有一些相通之处,都是建立在非阻塞的异步I/O和事件驱动的基础之上的。

2)响应式http客户端

此外,Spring WebFlux也提供了一个响应式的Http客户端API WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应。其底层也是由Netty提供的异步支持。

我们可以把WebClient看做是响应式的RestTemplate,与后者相比,前者:

  • 是非阻塞的,可以基于少量的线程处理更高的并发;
  • 可以使用Java 8 lambda表达式;
  • 支持异步的同时也可以支持同步的使用方式;
  • 可以通过数据流的方式与服务端进行双向通信。

当然,与服务端对应的,Spring WebFlux也提供了响应式的Websocket客户端API。

本节,我们仍然是本着“Hello,world!”的精神来上手熟悉WebFlux,因此暂时不会像手册一样面面俱到地谈到WebFlux的各个细节,我们通过以下几个例子来了解它:

  1. 先介绍一下使用Spring WebMVC风格的基于注解的方式如何编写响应式的Web服务,这几乎没有学习成本,非常赞。虽然这种方式在开发上与Spring WebMVC变化不大,但是框架底层已经是完全的响应式技术栈了;
  2. 再进一步介绍函数式的开发模式;
  3. 简单几行代码实现服务端推送(Server Send Event,SSE);
  4. 然后我们再加入响应式数据库的支持(使用Reactive Spring Data for MongoDB);
  5. 使用WebClient与前几步做好的服务端进行通信;
  6. 最后我们看一下如何通过“流”的方式在Http上进行通信。

Spring Boot 2是基于Spring 5的,其中一个比较大的更新就在于支持包括spring-webflux和响应式的spring-data在内的响应式模块。Spring Boot 2即将发布正式版,不过目前的版本从功能上已经完备,下边的例子我们就用Spring Boot 2在进行搭建。

基于WebMVC注解的方式

我们首先用Spring WebMVC开发一个只有Controller层的简单的Web服务,然后仅仅做一点点调整就可切换为基于Spring WebFlux的具有同样功能的Web服务。

我们使用Spring Boot 2搭建项目框架。

1)基于Spring Initializr创建项目

本节的例子很简单,不涉及Service层和Dao层,因此只选择spring-webmvc即可,也就是“Web”的starter。

也可以使用网页版的https://start.spring.io来创建项目:

创建后的项目POM中,包含下边的依赖,即表示基于Spring WebMVC:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

2)创建Controller和Endpoint

创建Controller类HelloController,仅提供一个Endpoint:/hello

1
2
3
4
5
6
7
8
@RestController
public class HelloController {

@GetMapping("/hello")
public String hello() {
return "Welcome to reactive world ~";
}
}

3)启动应用

OK了,一个简单的基于Spring WebMVC的Web服务。我们新增了HelloController.java,修改了application.properties

image-20200528154935726

使用IDE启动应用,或使用maven命令:

1
mvn spring-boot:run

测试Endpoint。在浏览器中访问http://localhost:8080/hello,或运行命令:

1
curl http://localhost:8080/hello

返回Welcome to reactive world ~

基于Spring WebFlux的项目与上边的步骤一致,仅有两点不同。我们这次偷个懒,就不从新建项目了,修改一下上边的项目:

4)依赖“Reactive Web”的starter是webflux而不是“Web”

修改项目POM,调整依赖使其基于Spring WebFlux:

1
2
3
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId> <!--【改】增加“flux”四个字符-->

5)Controller中处理请求的返回类型采用响应式类型

1
2
3
4
5
6
7
8
@RestController
public class HelloController {

@GetMapping("/hello")
public Mono<String> hello() { // 【改】返回类型为Mono<String>
return Mono.just("Welcome to reactive world ~"); // 【改】使用Mono.just生成响应式数据
}
}

6)启动应用

仅需要上边两步就改完了,是不是很简单,同样的方法启动应用。启动后发现应用运行于Netty上:

访问http://localhost:8080/hello,结果与Spring WebMVC的相同。

7)总结

从上边这个非常非常简单的例子中可以看出,Spring真是用心良苦,WebFlux提供了与之前WebMVC相同的一套注解来定义请求的处理,使得Spring使用者迁移到响应式开发方式的过程变得异常轻松。

虽然我们只修改了少量的代码,但是其实这个简单的项目已经脱胎换骨了。整个技术栈从命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】变成了响应式的、异步非阻塞的【spring-webflux + Reactor + Netty】。

Netty是一套异步的、事件驱动的网络应用程序框架和工具,能够开发高性能、高可靠性的网络服务器和客户端程序,因此与同样是异步的、事件驱动的响应式编程范式一拍即合。

下边的内容了解即可,就不实战了。 在Java 7推出异步I/O库,以及Servlet3.1增加了对异步I/O的支持之后,Tomcat等Servlet容器也随后开始支持异步I/O,然后Spring WebMVC也增加了对Reactor库的支持,所以上边第4)步如果不是将spring-boot-starter-web替换为spring-boot-starter-WebFlux,而是增加reactor-core的依赖的话,仍然可以用注解的方式开发基于Tomcat的响应式应用。

WebFlux的函数式开发模式

既然是响应式编程了,有些朋友可能会想统一用函数式的编程风格,WebFlux满足你。WebFlux提供了一套函数式接口,可以用来实现类似MVC的效果。我们先接触两个常用的。

再回头瞧一眼上边例子中我们用Controller定义定义对Request的处理逻辑的方式,主要有两个点:

  1. 方法定义处理逻辑;
  2. 然后用@RequestMapping注解定义好这个方法对什么样url进行响应。

在WebFlux的函数式开发模式中,我们用HandlerFunctionRouterFunction来实现上边这两点。

  • HandlerFunction相当于Controller中的具体处理方法,输入为请求,输出为装在Mono中的响应:

    1
    Mono<T extends ServerResponse> handle(ServerRequest request);
  • RouterFunction,顾名思义,路由,相当于@RequestMapping,用来判断什么样的url映射到那个具体的HandlerFunction,输入为请求,输出为装在Mono里边的Handlerfunction

    1
    Mono<HandlerFunction<T>> route(ServerRequest request);

我们看到,在WebFlux中,请求和响应不再是WebMVC中的ServletRequestServletResponse,而是ServerRequestServerResponse。后者是在响应式编程中使用的接口,它们提供了对非阻塞和回压特性的支持,以及Http消息体与响应式类型Mono和Flux的转换方法。

下面我们用函数式的方式开发两个Endpoint:

  1. time返回当前的时间;
  2. /date返回当前的日期。

对于这两个需求,HandlerFunction很容易写:

1
2
3
4
5
6
7
8
9
// 返回包含时间字符串的ServerResponse
HandlerFunction<ServerResponse> timeFunction =
request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);

// 返回包含日期字符串的ServerResponse
HandlerFunction<ServerResponse> dateFunction =
request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);

那么RouterFunction为:

1
2
3
RouterFunction<ServerResponse> router = 
RouterFunctions.route(GET("/time"), timeFunction)
.andRoute(GET("/date"), dateFunction);

不过这么写在业务逻辑复杂的时候不太好组织,我们通常采用跟MVC类似的代码组织方式,将同类业务的HandlerFunction放在一个类中,然后在Java Config中将RouterFunction配置为Spring容器的Bean。我们继续在第一个例子的代码上开发:

1)创建统一存放处理时间的Handler类

创建TimeHandler.java

1
2
3
4
5
6
7
8
9
10
11
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class TimeHandler {
public Mono<ServerResponse> getTime(ServerRequest serverRequest) {
return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);
}
public Mono<ServerResponse> getDate(ServerRequest serverRequest) {
return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
}
}

由于出现次数通常比较多,这里静态引入ServerResponse.ok()方法。

2)在Spring容器配置RouterFunction

我们采用Spring现在比较推荐的Java Config的配置Bean的方式,创建用于存放Router的配置类RouterConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class RouterConfig {
@Autowired
private TimeHandler timeHandler;

@Bean
public RouterFunction<ServerResponse> timerRouter() {
return route(GET("/time"), req -> timeHandler.getTime(req))
.andRoute(GET("/date"), timeHandler::getDate); // 这种方式相对于上一行更加简洁
}
}

3)重启服务试一试

1
2
3
4
5
$ curl http://localhost:8080/date
Today is 2018-02-26

$ curl http://localhost:8080/time
Now is 21:12:53

服务器推送

我们可能会遇到一些需要网页与服务器端保持连接(起码看上去是保持连接)的需求,比如类似微信网页版的聊天类应用,比如需要频繁更新页面数据的监控系统页面或股票看盘页面。我们通常采用如下几种技术:

  • 短轮询:利用ajax定期向服务器请求,无论数据是否更新立马返回数据,高并发情况下可能会对服务器和带宽造成压力;
  • 长轮询:利用comet不断向服务器发起请求,服务器将请求暂时挂起,直到有新的数据的时候才返回,相对短轮询减少了请求次数;
  • SSE:服务端推送(Server Send Event),在客户端发起一次请求后会保持该连接,服务器端基于该连接持续向客户端发送数据,从HTML5开始加入。
  • Websocket:这是也是一种保持连接的技术,并且是双向的,从HTML5开始加入,并非完全基于HTTP,适合于频繁和较大流量的双向通讯场景。

既然响应式编程是一种基于数据流的编程范式,自然在服务器推送方面得心应手,我们基于函数式方式再增加一个Endpoint /times,可以每秒推送一次时间。

1)增加Handler方法

1
2
3
4
5
6
public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) {
return ok().contentType(MediaType.TEXT_EVENT_STREAM).body( // 1
Flux.interval(Duration.ofSeconds(1)). // 2
map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())),
String.class);
}
  1. MediaType.TEXT_EVENT_STREAM表示Content-Typetext/event-stream,即SSE;
  2. 利用interval生成每秒一个数据的流。

2)配置router

1
2
3
4
5
6
@Bean
public RouterFunction<ServerResponse> timerRouter() {
return route(GET("/time"), timeHandler::getTime)
.andRoute(GET("/date"), timeHandler::getDate)
.andRoute(GET("/times"), timeHandler::sendTimePerSec); // 增加这一行
}

3)重启服务试一下

1
2
3
4
5
6
7
curl http://localhost:8080/times
data:21:32:22
data:21:32:23
data:21:32:24
data:21:32:25
data:21:32:26
<Ctrl+C>

就酱,访问这个url会收到持续不断的报时数据(时间数据是在data中的)。

响应式Spring Data

开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API支持异步的数据库驱动。就像从自来水厂到家里水龙头这个管道中,如果任何一个环节发生了阻塞,那就可能造成整体吞吐量的下降。

各个数据库都开始陆续推出异步驱动,目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。今天我们用MongoDB来写一个响应式demo。

我们这个例子很简单,就是关于User的增删改查,以及基于注解的服务端推送。

1)编写User

既然是举例,我们随便定义几个属性吧~

1
2
3
4
5
6
7
8
public class User {
private String id;
private String username;
private String phone;
private String email;
private String name;
private Date birthday;
}

然后为了方便开发,我们引入lombok库,它能够通过注解的方式为我们添加必要的Getter/Setter/hashCode()/equals()/toString()/构造方法等,添加依赖(版本可自行到http://search.maven.org搜索最新):

1
2
3
4
5
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>

然后为User添加注解:

1
2
3
4
5
@Data   // 生成无参构造方法/getter/setter/hashCode/equals/toString
@AllArgsConstructor // 生成所有参数构造方法
@NoArgsConstructor // @AllArgsConstructor会导致@Data不生成无参构造方法,需要手动添加@NoArgsConstructor,如果没有无参构造方法,可能会导致比如com.fasterxml.jackson在序列化处理时报错
public class User {
...

可能需要先在IDE中进行少量配置以便支持lombok的注解,比如IntelliJ IDEA:

  1. 安装“lombok plugin”:settings-Plugins-🔍lombok
  2. 开启对注解编译的支持:settings-bulid,Execution,Deployment-Compiler-default-☑️Enable annotation processing

2)增加Spring Data的依赖

在POM中增加Spring Data Reactive Mongo的依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

MongoDB是文档型的NoSQL数据库,因此,我们使用@Document注解User类:

1
2
3
4
5
6
7
8
9
10
11
12
@Data
@AllArgsConstructor
@Document
public class User {
@Id
private String id; // 注解属性id为ID
@Indexed(unique = true) // 注解属性username为索引,并且不能重复
private String username;
private String name;
private String phone;
private Date birthday;
}

OK,这样我们的模型就准备好了。MongoDB会自动创建collection,默认为类名首字母小写,也就是user

3)配置数据源

Spring Boot为我们搞定了几乎所有的配置,太赞了,下边是MongoDB的默认配置:

1
2
3
4
5
6
7
8
9
10
11
# MONGODB (MongoProperties)
spring.data.mongodb.authentication-database= # Authentication database name.
spring.data.mongodb.database=test # Database name.
spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use.
spring.data.mongodb.grid-fs-database= # GridFS database name.
spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri.
spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri.
spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri.
spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories.
spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials.
spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri.

请根据需要添加自定义的配置,比如我的MongoDB是跑在IP为192.168.0.101的虚拟机的Docker中的,就可在application.properties中增加一条:

1
spring.data.mongodb.host=192.168.0.101

4)增加DAO层repository

与非响应式Spring Data的CrudReposity对应的,响应式的Spring Data也提供了相应的Repository库:ReactiveCrudReposity,当然,我们也可以使用它的子接口ReactiveMongoRepository

我们增加UserRepository

1
2
3
4
public interface UserRepository extends ReactiveCrudRepository<User, String> {  // 1
Mono<User> findByUsername(String username); // 2
Mono<Long> deleteByUsername(String username);
}
  1. 同样的,ReactiveCrudRepository的泛型分别是UserID的类型;
  2. ReactiveCrudRepository已经提供了基本的增删改查的方法,根据业务需要,我们增加四个方法

5)Service层

由于业务逻辑几乎为零,只是简单调用了DAO层,直接贴代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class UserService {
@Autowired
private UserRepository userRepository;

/**
* 保存或更新。
* 如果传入的user没有id属性,由于username是unique的,在重复的情况下有可能报错,
* 这时找到以保存的user记录用传入的user更新它。
*/
public Mono<User> save(User user) {
return userRepository.save(user)
.onErrorResume(e -> // 1
userRepository.findByUsername(user.getUsername()) // 2
.flatMap(originalUser -> { // 4
user.setId(originalUser.getId());
return userRepository.save(user); // 3
}));
}

public Mono<Long> deleteByUsername(String username) {
return userRepository.deleteByUsername(username);
}

public Mono<User> findByUsername(String username) {
return userRepository.findByUsername(username);
}

public Flux<User> findAll() {
return userRepository.findAll();
}
}
  1. onErrorResume进行错误处理;
  2. 找到username重复的记录;
  3. 拿到ID从而进行更新而不是创建;
  4. 由于函数式为User -> Publisher,所以用flatMap

6)Controller层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;

@PostMapping("")
public Mono<User> save(User user) {
return this.userService.save(user);
}

@DeleteMapping("/{username}")
public Mono<Long> deleteByUsername(@PathVariable String username) {
return this.userService.deleteByUsername(username);
}

@GetMapping("/{username}")
public Mono<User> findByUsername(@PathVariable String username) {
return this.userService.findByUsername(username);
}

@GetMapping("")
public Flux<User> findAll() {
return this.userService.findAll();
}
}

8)stream+json

看到这里细心的朋友可能会有点嘀咕,怎么看是不是异步的呢?毕竟查询全部的时候,结果都用中括号括起来了,这和原来返回List<User>的效果似乎没多大区别。假设一下查询100个数据,如果是异步的话,以我们对“异步响应式流”的印象似乎应该是一个一个至少是一批一批的到达客户端的嘛。我们加个延迟验证一下:

1
2
3
4
@GetMapping("")
public Flux<User> findAll() {
return this.userService.findAll().delayElements(Duration.ofSeconds(1));
}

每个元素都延迟1秒,现在我们在数据库里弄三条记录,然后请求查询全部的那个URL,发现并不是像/times一样一秒一个地出来,而是3秒之后一块儿出来的。果然如此,这一点都不响应式啊!

/times类似,我们也加一个MediaType,不过由于这里返回的是JSON,因此不能使用TEXT_EVENT_STREAM,而是使用APPLICATION_STREAM_JSON,即application/stream+json格式。

1
2
3
4
@GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> findAll() {
return this.userService.findAll().delayElements(Duration.ofSeconds(2));
}

produces后边的值应该是application/stream+json字符串,因此用APPLICATION_STREAM_JSON_VALUE

重启服务再次请求,发现三个user是一秒一个的速度出来的,中括号也没有了,而是一个一个独立的JSON值构成的json stream:

1
2
3
{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"张三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"}
{"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}
{"id":"5a955f08fa10b93ec48df37f","username":"wangwu","name":"王五","phone":"18610861865","birthday":"1995-05-04T16:00:00.000+0000"}

9)总结

如果有Spring Data开发经验的话,切换到Spring Data Reactive的难度并不高。跟Spring WebFlux类似:原来返回User的话,那现在就返回Mono;原来返回List的话,那现在就返回Flux

对于稍微复杂的业务逻辑或一些必要的异常处理,比如上边的save方法,请一定采用响应式的编程方式来定义,从而一切都是异步非阻塞的。如下图所示,从HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的针对不同server的适配器),到我们编写的Controller和DAO,以及异步数据库驱动,构成了一个完整的异步非阻塞的管道,里边流动的就是响应式流。

image-20200528164804774

使用WebClient开发响应式Http客户端

下面,我们用WebClient测试一下前边几个例子的成果。

1) /hello,返回Mono

1
2
3
4
5
6
7
8
9
10
@Test
public void webClientTest1() throws InterruptedException {
WebClient webClient = WebClient.create("http://localhost:8080"); // 1
Mono<String> resp = webClient
.get().uri("/hello") // 2
.retrieve() // 3
.bodyToMono(String.class); // 4
resp.subscribe(System.out::println); // 5
TimeUnit.SECONDS.sleep(1); // 6
}
  1. 创建WebClient对象并指定baseUrl;
  2. HTTP GET;
  3. 异步地获取response信息;
  4. 将response body解析为字符串;
  5. 打印出来;
  6. 由于是异步的,我们将测试线程sleep 1秒确保拿到response,也可以像前边的例子一样用CountDownLatch

2) /user,返回Flux

为了多演示一些不同的实现方式,下边的例子我们调整几个地方,但是效果跟上边是一样的:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void webClientTest2() throws InterruptedException {
WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build(); // 1
webClient
.get().uri("/user")
.accept(MediaType.APPLICATION_STREAM_JSON) // 2
.exchange() // 3
.flatMapMany(response -> response.bodyToFlux(User.class)) // 4
.doOnNext(System.out::println) // 5
.blockLast(); // 6
}
  1. 这次我们使用WebClientBuilder来构建WebClient对象;
  2. 配置请求Header:Content-Type: application/stream+json
  3. 获取response信息,返回值为ClientResponseretrive()可以看做是exchange()方法的“快捷版”;
  4. 使用flatMap来将ClientResponse映射为Flux;
  5. 只读地peek每个元素,然后打印出来,它并不是subscribe,所以不会触发流;
  6. 上个例子中sleep的方式有点low,blockLast方法,顾名思义,在收到最后一个元素前会阻塞,响应式业务场景中慎用。

3) /times,服务端推送

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void webClientTest3() throws InterruptedException {
WebClient webClient = WebClient.create("http://localhost:8080");
webClient
.get().uri("/times")
.accept(MediaType.TEXT_EVENT_STREAM) // 1
.retrieve()
.bodyToFlux(String.class)
.log() // 2
.take(10) // 3
.blockLast();
}
  1. 配置请求Header:Content-Type: text/event-stream,即SSE;
  2. 这次用log()代替doOnNext(System.out::println)来查看每个元素;
  3. 由于/times是一个无限流,这里取前10个,会导致流被取消

让数据在Http上双向无限流动起来

🔧下面我们实现一个这样两个Endpoint:

  • POST方法的/events,“源源不断”地收集数据,并存入数据库;
  • GET方法的/events,“源源不断”将数据库中的记录发出来。

0)准备

一、数据模型MyEvent

1
2
3
4
5
6
7
8
9
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "event") // 1
public class MyEvent {
@Id
private Long id; // 2
private String message;
}
  1. 指定collection名为event
  2. 这次我们使用表示时间的long型数据作为ID。

二、DAO层:

1
2
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { // 1
}

下边用到了可以保存Flux的insert(Flux)方法,这个方法是在ReactiveMongoRepository中定义的。

三、简单起见就不要Service层了,直接Controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/events")
public class MyEventController {
@Autowired
private MyEventRepository myEventRepository;

@PostMapping(path = "")
public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) { // 1
// TODO
return null;
}

@GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyEvent> getEvents() { // 2
// TODO
return null;
}
}
  1. POST方法的接收数据流的Endpoint,所以传入的参数是一个Flux,返回结果其实就看需要了,我们用一个Mono作为方法返回值,表示如果传输完的话只给一个“完成信号”就OK了;
  2. GET方法的无限发出数据流的Endpoint,所以返回结果是一个Flux<MyEvent>,不要忘了注解上produces = MediaType.APPLICATION_STREAM_JSON_VALUE

1)接收数据流的Endpoint

在客户端,WebClient可以接收text/event-streamapplication/stream+json格式的数据流,也可以在请求的时候上传一个数据流到服务器; 在服务端,WebFlux也支持接收一个数据流作为请求参数,从而实现一个接收数据流的Endpoint。

我们先看服务端。Controller中的loadEvents方法:

1
2
3
4
@PostMapping(path = "", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1
public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) {
return this.myEventRepository.insert(events).then(); // 2
}
  1. 指定传入的数据是application/stream+json,与getEvents方法的区别在于这个方法是consume这个数据流;
  2. insert返回的是保存成功的记录的Flux,但我们不需要,使用then方法表示“忽略数据元素,只返回一个完成信号。

服务端写好后,启动之,再看一下客户端怎么写(还是放在src/test下):

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void webClientTest4() {
Flux<MyEvent> eventFlux = Flux.interval(Duration.ofSeconds(1))
.map(l -> new MyEvent(System.currentTimeMillis(), "message-" + l)).take(5); // 1
WebClient webClient = WebClient.create("http://localhost:8080");
webClient
.post().uri("/events")
.contentType(MediaType.APPLICATION_STREAM_JSON) // 2
.body(eventFlux, MyEvent.class) // 3
.retrieve()
.bodyToMono(Void.class)
.block();
}
  1. 声明速度为每秒一个MyEvent元素的数据流,不加take的话表示无限个元素的数据流;
  2. 声明请求体的数据格式为application/stream+json
  3. body方法设置请求体的数据。

运行一下这个测试,根据控制台数据可以看到是一条一条将数据发到/events的,看一下MongoDB中的数据:

image-20200528171159505

2)发出无限流的Endpoint

回想一下前边/user的例子,当数据库中所有的内容都查询出来之后,这个流就结束了,因为其后跟了一个“完成信号”,我们可以通过在UserServicefindAll()方法的流上增加log()操作符来观察更详细的日志:

image-20200528171236497

我们可以看到在三个onNext信号后是一个onComplete信号。

这样的流是有限流,这个时候如果在数据库中再新增一个User的话,已经结束的请求也不会再有新的内容出现了。

反观/times请求,它会无限地发出SSE,而不会有“完成信号”出现,这是无限流。

我们希望的情况是无论是请求GET的/events之后,当所有数据都发完之后,不要结束,而是挂起等待新的数据。如果我们用上边的POST的/events传入新的数据到数据库后,新的数据会自动地流到客户端。

这可以在DAO层配置实现:

1
2
3
4
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
@Tailable // 1
Flux<MyEvent> findBy(); // 2
}
  1. @Tailable注解的作用类似于linux的tail命令,被注解的方法将发送无限流,需要注解在返回值为Flux这样的多个元素的Publisher的方法上
  2. findAll()是想要的方法,但是在ReactiveMongoRepository中我们够不着,所以使用findBy()代替。

然后完成Controller中的方法:

1
2
3
4
@GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyEvent> getEvents() {
return this.myEventRepository.findBy();
}

不过,这还不够,@Tailable仅支持有大小限制的(“capped”)collection,而自动创建的collection是不限制大小的,因此我们需要先手动创建。Spring Boot提供的CommandLineRunner可以帮助我们实现这一点。

Spring Boot应用程序在启动后,会遍历CommandLineRunner接口的实例并运行它们的run方法。

1
2
3
4
5
6
7
@Bean   // 1
public CommandLineRunner initData(MongoOperations mongo) { // 2
return (String... args) -> { // 3
mongo.dropCollection(MyEvent.class); // 4
mongo.createCollection(MyEvent.class, CollectionOptions.empty().size(200).capped()); // 5
};
}
  1. 对于复杂的Bean只能通过Java Config的方式配置,这也是为什么Spring3之后官方推荐这种配置方式的原因,这段代码可以放到配置类中,本例我们就直接放到启动类WebFluxDemoApplication了;
  2. MongoOperations提供对MongoDB的操作方法,由Spring注入的mongo实例已经配置好,直接使用即可;
  3. CommandLineRunner也是一个函数式接口,其实例可以用lambda表达;
  4. 如果有,先删除collection,生产环境慎用这种操作;
  5. 创建一个记录个数为10的capped的collection,容量满了之后,新增的记录会覆盖最旧的。

OK,这个时候我们请求一下http://localhost:8080/events,发现立马返回了,并没有挂起。原因在于collection中一条记录都没有,而@Tailable起作用的前提是至少有一条记录。

跑一下WebClient测试程序插入5条数据,然后再次请求

image-20200528174106393

请求是挂起的,这没错,但是只有两条数据,看WebClient测试程序的控制台明明发出了5个请求啊。

原因定义的CollectionOptions.empty().size(200).capped()中,size指的是以字节为单位的大小,并且会向上取到256的整倍数,所以我们刚才定义的是256byte大小的collection,所以最多容纳两条记录。我们可以这样改一下:

1
CollectionOptions.empty().maxDocuments(200).size(100000).capped()

maxDocuments限制了记录条数,size限制容量且是必须定义的,因为MongoDB不像关系型数据库有严格的列和字段大小定义,鬼知道会存多大的数据进来,所以容量限制是必要的。

好了,再次启动应用,先插入5条数据,然后请求/events,收到5条记录后请求仍然挂起,在插入5条数据,curl客户端又会陆续收到新的数据。

image-20200528174245077

总结

这一节,我们对WebFlux做了一个简单的基于实例的介绍,相信你对响应式编程及其在WEB应用中如何发挥作用有了更多的体会,本章的实战是比较基础的,初衷是希望能够通过上手编写代码体会响应式编程的感觉,因为切换到响应式思维方式并非易事。

这一章的核心关键词其实翻来覆去就是:“异步非阻塞的响应式流”。我们了解了异步非阻塞的好处,也知道如何让数据流动起来,下面我们就通过对实例的性能测试,借助实实在在的数据,真切感受一下异步非阻塞的“丝滑”。

Fork me on GitHub