Ratpack:构建简单高效的 HTTP 微服务(4)异步和非阻塞
- UID
- 1066743
|
Ratpack:构建简单高效的 HTTP 微服务(4)异步和非阻塞
异步和非阻塞Ratpack 从其底层设计和实现都是异步的,极大的减少线程阻塞在 I/O 上的等待时间,从而提升应用的性能。Ratpack 的底层 Netty 提供了事件驱动和非阻塞的 HTTP I/O,类似 NodeJS。Ratpack 库提供的请求处理也是异步完成的。相比于传统的 Servlet 框架的同步方式,Ratpack 的异步方式的性能更优,所耗费的系统资源较少,不过对习惯了同步方式的开发人员来说,需要一定时间的学习和适应。
Promise 是 Ratpack 中代码执行的重要接口,表示的是一个会在将来变得可用的值。Ratpack 中的 Promise 的概念,与 JavaScript 框架常用的 Promise 的概念是相同的。当 Promise 中所表示的值变得可用时,该 Promise 上关联的回调函数会被调用。代码清单中已经展示了 Promise 的用法。Context 的 parse 返回的就是一个 Promise 对象。通过 Promise 的 then 方法可以添加一个以 Action 接口来表示的回调函数。当 parse 完成解析时,所得到的结果会被传递给回调函数。在使用 Ratpack 库时,经常会看到类似这样的代码结构。
在 Ratpack 应用中可能需要调用第三方服务,而这些服务本身是同步执行的,如调用一个第三方的 Web 服务。在这种情况下,Ratpack 提供了一个单独的线程池,用来执行这些同步操作,并提供了相关的 API 来把同步操作转换成异步操作。Blocking 对象的 get 方法可以把一个同步操作转换成 Promise 对象,从而可以与 Ratpack 应用中的其他部分进行整合。
在 中,Blocking 的 get 方法从一个 Factory 函数接口中获取到所需的结果。这里通过线程睡眠来模拟耗时较长的同步操作。由于 Blocking 的 get 方法返回的是 Promise 对象,可以直接通过 then 方法来对返回的结果进行处理。
清单 9. 在 Ratpack 中使用同步方法1
2
3
4
5
6
7
8
9
10
11
12
| public class BlockingGet {
public static void main(String[] args) throws Exception {
RatpackServer.start(server ->
server.handlers(chain ->
chain.get(ctx -> Blocking.get(() -> {
Thread.sleep(5000);
return "Hello World";
}).then(str -> ctx.render(str)))
)
);
}
}
|
需要注意的是,Ratpack 的 Promise 可能会造成所谓的回调函数地狱问题(callback hell),即过多层次的回调函数嵌套。对于这样的情况,可以考虑使用 RxJava 来解决。
响应式流Ratpack 提供了对流处理的功能。Ratpack 的流处理 API 基于标准的 Reactive Streams API。Reactive Streams 是 JVM 上进行非阻塞带背压(back pressure) 的异步流处理的规范。Reactive Streams 的一个重要特征是支持带背压的流控制,其核心思想是由流的消费者来通知流的生产者其所能处理的数据量。这样可以避免处理速度较慢的消费者占用生产者的过多资源。
Reactive Streams API 中定义了 3 个最基本的接口,Publisher、Subscriber 和 Subscription。Publisher 是数据的发送者,其中的 subscribe 方法允许数据的消费者 Subscriber 进行注册并开始消费数据。Subscriber 是数据的消费者,其中定义了不同的事件回调方法。当在 Publisher 上注册成功之后,Subscriber 接口的 onSubscribe 方法会被调用,表示当前注册的 Subscription 接口的对象作为参数传入。当有数据可以消费时,Subscriber 接口的 onNext 方法会被调用,产生的数据作为 onNext 的参数传入;当不再有任何数据可用时,onComplete 方法会被调用;当产生数据出现错误时,onError 方法会被调用,与错误相关的 Throwable 对象作为参数传入。前面提到的支持带背压的流控制体现在 Subscription 接口中。Subscription 接口的 request 方法用来通知 Publisher 发送指定数量的数据,这样 Subscriber 就可以只要求它所能处理的数据量。cancel 方法用来通知 Publisher 接口停止发送数据。
Ratpack 对响应式流的支持体现在可以直接使用响应式流作为 HTTP 的响应,服务器推送事件(Server Sent Events)和 WebSocket 的数据源。以作为 HTTP 的响应为例,Ratpack 可以把响应式流的内容以 HTTP 分块传输编码(Chunk Encoding)的格式进行发送。
中展示了一个从文件系统中读取文件内容并作为 HTTP 请求的响应的示例。为了代码可以工作,需要额外添加 ratpack-rx 和 rxjava-reactive-streams 两个依赖包,其中 ratpack-rx 是 Ratpack 提供的与 RxJava 集成的库,而 rxjava-reactive-streams 是 RxJava 与 Reactive Streams API 之间的桥接库。首先使用 Files 类的 readAllLines 方法从文件中读取所有行来得到一个 List<String>对象,再从该对象中创建一个 RxJava 的 Observable,再把该 Observable 对象转换成 Reactive Streams API 的 Publisher 对象。Ratpack 的 ratpack.http.ResponseChunks.stringChunks 方法可以把一个 Publisher 对象以 HTTP 分块编码的格式直接发送。
清单 10. 使用响应式流读取文件并发送1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public class FileWriter {
public static void main(String[] args) throws Exception {
RxRatpack.initialize();
new FileWriter().start();
}
public void start() throws Exception {
RatpackServer.start(server ->
server
.serverConfig(c -> c.baseDir(Paths.get("app").toAbsolutePath()))
.handlers(chain ->
chain.get(ctx -> {
Publisher<String> publisher = RxReactiveStreams.toPublisher(
Observable.from(
Files.readAllLines(
ctx.getFileSystemBinding().file("largefile.txt"))));
ctx.render(stringChunks(publisher));
})
)
);
}
}
|
|
|
|
|
|
|