Getting Started Reactive
学习如何用 Quarkus 创建一个反应式应用程序,并探索 Quarkus 提供的不同的反应式功能。 本指南涵盖:
-
快速查看 Quarkus 引擎以及它如何启用反应模式
-
对Mutiny的简要介绍――Quarkus使用的反应式编程类库
-
引导一个被动应用程序
-
创建一个反应式的 JAX-RS 接口(asynchrons, streams…)
-
反应式访问数据库
-
与其它反应式 API 交互
成果
我们建议您按照Boostrapping project及以后的指示一步一步创建应用程序。
但是你也可以直接跳到已完成的示例。
下载 压缩包 或克隆一个 git 仓库:
git clone https://github.com/quarkusio/quarkus-quickstarts.git
代码在 getting-started-reactive
和 getting-started-reactive-crud
目录中。
多方面了解 Quarkus 中的反应式
Quarkus 是反应式的. 如果你深入底层,会发现一个反应式引擎 Quarkus 应用程序提供动力。 此引擎是 Eclipse Vert.x (https://vertx.io)。 Every IO interaction passes through the non-blocking and reactive Vert.x engine.
让我们举两个例子来解释它如何工作。 想象传入的 HTTP 请求。 (Vert.x) HTTP 服务器接收请求,然后将其路由到应用程序。 如果请求的是 JAX-RS 资源, 路由层在 worker 线程中调用资源方法,并在数据可用时写入响应。 迄今为止,没有问题。 以下图片描述了这种行为。
But if the HTTP request targets a reactive (non-blocking) route, the routing layer invokes the route on the IO thread giving lots of benefits such as higher concurrency and performance:
因此,许多 Quarkus 组件设计为反应式,例如数据库访问(PostgreSQL、MySQL、Mongo等),应用程序服务(邮件、模板引擎等)、消息(Kafka、AMQP等) 等等。 但是,为了从这个模型中充分受益,应用代码应该以非阻塞范式编写。 That’s where having a reactive API is an ultimate weapon.
Mutiny —— 一个反应式编程类库
Mutiny 是一个反应式编程库,允许表达和形成异步操作。 它提供两种类型:
-
io.smallrye.mutiny.Uni
- 返回0或1个结果的异步操作 -
io.smallrye.mutiny.Multi
- 返回多项 (带背压) 流
这两种类型都是 lazy 并遵循订阅模版。 仅在实际需要时(即订阅者注册时)开始计算。
uni.subscribe().with(
result -> System.out.println("result is " + result),
failure -> failure.printStackTrace()
);
multi.subscribe().with(
item -> System.out.println("Got " + item),
failure -> failure.printStackTrace()
);
Uni
和 Multi
都暴露了事件驱动的 API: 你想在事情发生时(成功、 失败等) 做什么。
这些API被分组(操作类型),以使其更清晰避免单个 class 出现100种方法。
操作的主要类型是对失败、完成几元素操纵、提取或收集作出反应。
它用一套可理解的 API 提供了一个简易的编码体验,结果不需要太多反应式的知识。
httpCall
.onFailure().recoverWithItem("my fallback");
您可能会想知道 Reactive Streams (https://www.reactive-streams.org/).
Multi
实现了Reactive Streams Publisher
,所以实现了反应流的背压机制。
Uni
没有实现 Publisher
,因为 Uni
的订阅足以表明你对结果感兴趣。
由于Reactive Streams订阅/请求仪式较为复杂,因此还是想到更简单和更顺利的API。
Uni
和 Multi
两者是 Quarkus 的反应式必要的支柱,为必要的构建提供了桥梁。
例如,你可以将 Multi
转换为 Iterable
或 await 一个由 Uni
产生的数据。
// Block until the result is available
String result = uni.await().indefinitely();
// Transform an asynchronous stream into a blocking iterable
stream.subscribe().asIterable().forEach(s -> System.out.println("Item is " + s));
在这一点上,如果你是RxJava 或 Reactor 用户,你可能会想知道如何能够使用你熟悉的 Flowable
, Single
, Flux
, Mono
…
Mutiny 允许将 RX Java 和 Reactor 类型与 Unis
和 Multis
相互转换:
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Flux<String> flux = multi.convert().with(MultiReactorConverters.toFlux());
但是,Vert.x 怎么样? Vert.x API 也可以使用 Mutiny 类型。 下面的代码片段显示了 Vert.x Web Client 的用法:
// Use io.vertx.mutiny.ext.web.client.WebClient
client = WebClient.create(vertx,
new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
.setTrustAll(true));
// ...
Uni<JsonObject> uni =
client.get("/api/fruit/" + name)
.send()
.onItem().apply(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
最后但并非最不重要的是,Mutiny 已经内置与 MicroProfile Context Propagation,以便您在反应式管道中传播事务, 可追踪性数据,等等。
说得够多了,让我们开始动手!
初始化项目
创建一个新的 Quarkus 项目的最简单方式是打开一个终端并运行以下命令:
适用于 Linux 和 macOS 用户
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started-reactive \
-DclassName="org.acme.quickstart.ReactiveGreetingResource" \
-Dpath="/hello" \
-Dextensions="resteasy-mutiny, resteasy-jsonb"
cd getting-started-reactive
适用于Windows用户
-
如果使用 cmd, (不要使用前方斜线
\
)
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create -DprojectGroupId=org.acme -DprojectArtifactId=getting-started-reactive -DclassName="org.acme.quickstart.ReactiveGreetingResource" -Dpath="/hello" -Dextensions="resteasy-mutiny,resteasy-jsonb"
-
如果使用 Powershell,用双引号换行
-D
参数
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create "-DprojectGroupId=org.acme" "-DprojectArtifactId=getting-started-reactive" "-DclassName=org.acme.quickstart.ReactiveGreetingResource" "-Dpath=/hello" "-Dextensions=resteasy-mutiny,resteasy-jsonb"
它在 ./getting-started-reactive
中生成以下内容:
-
Maven 项目结构
-
接口
/hello
对应的资源org.acme.quickstart.ReactiveGreetingResource
-
关联的单元测试
-
启动应用程序后可通过
http://localhost:8080
访问的着陆页 -
src/main/docker
中有native
及jvm
模式的Dockerfile
样板文件 -
应用程序配置文件
生成的 pom.xml
还声明了 RESTEasy Mutiny 支持和用于序列化的 RESTEasy JSON-B 。
反应式 JAX-RS 资源
在项目创建过程中,创建了 src/main/java/org/acme/quickstart/ReactiveGreetingResource.java
文件,内容如下:
package org.acme.quickstart;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
public class ReactiveGreetingResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}
}
这是一个非常简单的 REST 接口,请求 "/hello" 时返回 "hello" 。
现在我们创建一个包含以下内容的 ReactiveGreetingService
类:
package org.acme.getting.started;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
@ApplicationScoped
public class ReactiveGreetingService {
public Uni<String> greeting(String name) {
return Uni.createFrom().item(name)
.onItem().apply(n -> String.format("hello %s", name));
}
}
然后编辑 ReactiveGreetingResource
成下边的样子:
package org.acme.getting.started;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.annotations.SseElementType;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import org.reactivestreams.Publisher;
@Path("/hello")
public class ReactiveGreetingResource {
@Inject
ReactiveGreetingService service;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/greeting/{name}")
public Uni<String> greeting(@PathParam String name) {
return service.greeting(name);
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}
}
ReactiveGreetingService
类包含一个简单的方法返回一个 Uni
。
此例中,结果项立即被生成,但你可以想象出产生 Uni
的异步API。 我们稍后在本指南中谈到这一点。
现在,启动应用程序:
./mvnw quarkus:dev
运行后,请访问 http://localhost:8080/hello/greeting/neo 来检查是否收到预期的欢迎信息。
处理流
迄今为止,我们只是返回了异步结果。 在本节中,我们扩展一下应用程序用流传多次数据。 这些数据流可以来自 Kafka 或任何其他数据来源,但为了简单起见,我们只是定期生成问候消息。
在 ReactiveGreetingService
中,添加以下方法:
public Multi<String> greetings(int count, String name) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().apply(n -> String.format("hello %s - %d", name, n))
.transform().byTakingFirstItems(count);
}
每秒钟都生成一条问候消息,并在 count
条消息后停止。
在 ReactiveGreetingResource
中,添加以下方法:
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/greeting/{count}/{name}")
public Multi<String> greetings(@PathParam int count, @PathParam String name) {
return service.greetings(count, name);
}
此接口将数据流通过 JSON 数组返回给客户端。 消息的名称和数量使用路径参数指定。
调用接口会产生如下效果:
$ curl http://localhost:8080/hello/greeting/3/neo
["hello neo - 0","hello neo - 1","hello neo - 2"]
我们还可以通过返回一个 Multi
生成 Server-Sent Event 响应:
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(@PathParam int count, @PathParam String name) {
return service.greetings(count, name);
}
与前一个代码段的唯一区别是生成的类型和表示每个事件类型的注解 @SseElementType
。
由于 @Produces
的注解定义 SERVER_SENT_EVENTS
, JAX-RS 需要它知道每次(嵌套) 事件的内容类型。
您可以看下结果:
$ curl -N http://localhost:8080/hello/stream/5/neo
data: hello neo - 0
data: hello neo - 1
data: hello neo - 2
data: hello neo - 3
data: hello neo - 4
使用反应式 APIs
使用 Quarkus 反应式APIs
Quarkus 通过 Mutiny 提供了许多反应式 API。 在本节中, 我们看下如何使用 Reactive PostgreSQL 驱动以非阻塞反应式与数据库进行交互。
创建一个新项目:
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started-reactive-crud \
-DclassName="org.acme.reactive.crud.FruitResource" \
-Dpath="/fruits" \
-Dextensions="resteasy-mutiny, resteasy-jsonb, reactive-pg-client"
cd getting-started-reactive-crud
此应用程序与 PostgreSQL 数据库交互,因此您需要:
docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 \
--name postgres-quarkus-reactive -e POSTGRES_USER=quarkus_test \
-e POSTGRES_PASSWORD=quarkus_test -e POSTGRES_DB=quarkus_test \
-p 5432:5432 postgres:11.2
然后,让我们配置数据源。
打开 src/main/resources/application.properties
并添加以下内容:
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus_test
quarkus.datasource.password=quarkus_test
quarkus.datasource.reactive.url=postgresql://localhost:5432/quarkus_test
myapp.schema.create=true
前三行定义了数据源。 最后一行将指示应用程序初始化后是否插入几项。
现在,让我们创建我们的 entity。
创建具有以下内容的 org.acme.crud.Fruit
类:
package org.acme.reactive.crud;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;
import java.util.stream.StreamSupport;
public class Fruit {
public Long id;
public String name;
public Fruit() {
// default constructor.
}
public Fruit(String name) {
this.name = name;
}
public Fruit(Long id, String name) {
this.id = id;
this.name = name;
}
public static Multi<Fruit> findAll(PgPool client) {
return client.query("SELECT id, name FROM fruits ORDER BY name ASC")
// Create a Multi from the set of rows:
.onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
// For each row create a fruit instance
.onItem().apply(Fruit::from);
}
public static Uni<Fruit> findById(PgPool client, Long id) {
return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1", Tuple.of(id))
.onItem().apply(RowSet::iterator)
.onItem().apply(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
}
public Uni<Long> save(PgPool client) {
return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)", Tuple.of(name))
.onItem().apply(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}
public Uni<Boolean> update(PgPool client) {
return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2", Tuple.of(name, id))
.onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
}
public static Uni<Boolean> delete(PgPool client, Long id) {
return client.preparedQuery("DELETE FROM fruits WHERE id = $1", Tuple.of(id))
.onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
}
private static Fruit from(Row row) {
return new Fruit(row.getLong("id"), row.getString("name"));
}
}
此 entity 包含几个字段和方法来查找、更新和删除数据库中的行。
这些方法返回 Unis
或 Multis
,因为在检索后会异步产生数据。
注意反应式 PostgreSQL 客户端已经提供 Uni
和 Multi
两个实例。
所以你只将数据库中的结果转换为 business-friendly 对象。
然后,让我们在 FruitResource
中使用这个 Fruit
类。
然后编辑 FruitResource
成下边的样子:
package org.acme.reactive.crud;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
@Path("fruits")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class FruitResource {
@Inject
@ConfigProperty(name = "myapp.schema.create", defaultValue = "true")
boolean schemaCreate;
@Inject
PgPool client;
@PostConstruct
void config() {
if (schemaCreate) {
initdb();
}
}
private void initdb() {
client.query("DROP TABLE IF EXISTS fruits")
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)"))
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')"))
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')"))
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')"))
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')"))
.await().indefinitely();
}
@GET
public Multi<Fruit> get() {
return Fruit.findAll(client);
}
@GET
@Path("{id}")
public Uni<Response> getSingle(@PathParam Long id) {
return Fruit.findById(client, id)
.onItem().apply(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
.onItem().apply(ResponseBuilder::build);
}
@POST
public Uni<Response> create(Fruit fruit) {
return fruit.save(client)
.onItem().apply(id -> URI.create("/fruits/" + id))
.onItem().apply(uri -> Response.created(uri).build());
}
@PUT
@Path("{id}")
public Uni<Response> update(@PathParam Long id, Fruit fruit) {
return fruit.update(client)
.onItem().apply(updated -> updated ? Status.OK : Status.NOT_FOUND)
.onItem().apply(status -> Response.status(status).build());
}
@DELETE
@Path("{id}")
public Uni<Response> delete(@PathParam Long id) {
return Fruit.delete(client, id)
.onItem().apply(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
.onItem().apply(status -> Response.status(status).build());
}
}
基于 Fruit
类生成的结果,该资源返回 Uni
和 Multi
实例。
使用 Vert.x 客户端
上一个示例使用由 Quarkus 提供的 service。 您也可以直接使用 Vert.x 客户端。 Vert.x API 有一个 Mutiny 版本。 此 API 被分成几个你可以独立导入的包:
groupId:artifactId | 描述 |
---|---|
|
Mutiny API for Vert.x Core |
|
Vert.x邮件客户端的Mutiny API |
|
Vertiny API 适用于 Vert.x Web 客户端 |
|
Vert.x Mongo 客户端的 Mutiny API |
|
Vert.x Redis 客户端的 Mutiny API |
|
Vert.x Cassandra客户端的 Mutiny API |
|
Vert.x Consull 客户端的 Mutiny API |
|
Vert.x Kafka客户端的 Mutiny API |
|
Vert.xAMQP客户端的 Mutiny API |
|
Vert.x RabbitMQ 客户端的 Mutiny API |
您也可以在 http://smallrye.io/smallrye-reactive-utils/apidocs/ 查看可用的 API。
让我们举一个例子。 将以下依赖添加到您的应用程序:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
它提供了 Vert.x Web Client 的 Mutiny API。 然后,您可以如下使用 web client :
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/fruit-data")
public class ResourceUsingWebClient {
@Inject
Vertx vertx;
private WebClient client;
@PostConstruct
void initialize() {
this.client = WebClient.create(vertx,
new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
.setTrustAll(true));
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(@PathParam("name") String name) {
return client.get("/api/fruit/" + name)
.send()
.map(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
有两个要点:
-
注入的 Vert.x 实例
io.vertx.mutiny.core.Vertx
类型是 Vert.x 的 Mutiny 变种; -
Web Client 由
io.vertx.mutiny.ext.web.client.WebClient
创建。
Mutiny 版本的 Vert.x APIs 也提供了:
-
andAwait
方法,如sendAndAwait
。andAwait
表示调用者线程被阻塞,直到结果可用。 请注意不要以这种方式阻塞 event loop / IO 线程。 -
andForget
方法,例如writeAndForget
。andForget
可用于返回Uni
的方法。andForget
表示你不需要由此产生的Uni
表示操作成功或失败。 然而,请记住,如果您不订阅,操作将不会触发。andForget
为你管理这个并管理订阅。 -
toMulti
方法允许将 Vert.xReadStream
转换为Multi
-
toBlockingIterable
/toBlockingStream
方法允许将 Vert.xReadStream
转换成阻塞式迭代或阻塞式java.util.Stream
使用 RxJava 或 Reactor APIs
Mutiny 提供实用类将 RxJava 2 和 Project Reactor 转换为 Uni
and Multi
。
RxJava 2 转换器在以下依赖中可用:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-rxjava</artifactId>
</dependency>
所以,如果你有一个 API 返回 RxJava 2 类型(Completable
, Single
, Maybe
, Observable
, Flowable
),你可以如下创建 Unis
and Multis
:
import io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.mutiny.converters.uni.UniRxConverters;
// ...
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);
Multi<Void> multiFromCompletable = Multi.createFrom().converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom().converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);
你也可以将 Unis
and Multis
转换为 RxJava 类型:
Completable completable = uni.convert().with(UniRxConverters.toCompletable());
Single<Optional<String>> single = uni.convert().with(UniRxConverters.toSingle());
Single<String> single2 = uni.convert().with(UniRxConverters.toSingle().failOnNull());
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Observable<String> observable = uni.convert().with(UniRxConverters.toObservable());
Flowable<String> flowable = uni.convert().with(UniRxConverters.toFlowable());
// ...
Completable completable = multi.convert().with(MultiRxConverters.toCompletable());
Single<Optional<String>> single = multi.convert().with(MultiRxConverters.toSingle());
Single<String> single2 = multi.convert().with(MultiRxConverters
.toSingle().onEmptyThrow(() -> new Exception("D'oh!")));
Maybe<String> maybe = multi.convert().with(MultiRxConverters.toMaybe());
Observable<String> observable = multi.convert().with(MultiRxConverters.toObservable());
Flowable<String> flowable = multi.convert().with(MultiRxConverters.toFlowable());
Project Reactor 转换器在以下依赖中可用:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-reactor</artifactId>
</dependency>
所以,如果你有一个 API 返回反应堆类型(Mono
, Flux
),你可以如下创建 Unis
and Multis
:
import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
// ...
Uni<String> uniFromMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), mono);
Uni<String> uniFromFlux = Uni.createFrom().converter(UniReactorConverters.fromFlux(), flux);
Multi<String> multiFromMono = Multi.createFrom().converter(MultiReactorConverters.fromMono(), mono);
Multi<String> multiFromFlux = Multi.createFrom().converter(MultiReactorConverters.fromFlux(), flux);
你也可以将 Unis
和 Multis
转换成 Reactor 类型:
Mono<String> mono = uni.convert().with(UniReactorConverters.toMono());
Flux<String> flux = uni.convert().with(UniReactorConverters.toFlux());
Mono<String> mono2 = multi.convert().with(MultiReactorConverters.toMono());
Flux<String> flux2 = multi.convert().with(MultiReactorConverters.toFlux());
使用 CompletionStages 或 Publisher API
如果你正在使用 ComplettionStage
, CompletableFuture
, 或 Publisher
API,你可以将它转换还原。
首先, Uni
和 Multi
都可以从 CompletionStage
或从 Supplier<CompletionStage>
创建。 例如:
CompletableFuture<String> future = Uni
// Create from a Completion Stage
.createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"));
在 Uni
上,你也可以使用 subscribeAssComplettionStage()
生成一个 CompletitionStage
,它能获得 Uni
产生的数据或错误。
您也可以使用 createFrom().publisher(Publisher)
创建 Unis
and Multis
。
你可以用 toMulti
将 Uni
转成 Publisher
。
的确,Multi
实现了 Publisher
。