Getting Started Reactive

学习如何用 Quarkus 创建一个反应式应用程序,并探索 Quarkus 提供的不同的反应式功能。 本指南涵盖:

  • 快速查看 Quarkus 引擎以及它如何启用反应模式

  • 对Mutiny的简要介绍――Quarkus使用的反应式编程类库

  • 引导一个被动应用程序

  • 创建一个反应式的 JAX-RS 接口(asynchrons, streams…​)

  • 反应式访问数据库

  • 与其它反应式 API 交互

必备条件

要完成本指南,您需要:

  • 15 分钟以内

  • IDE

  • 安装了JDK 8+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.6.2+

成果

我们建议您按照Boostrapping project及以后的指示一步一步创建应用程序。

但是你也可以直接跳到已完成的示例。

下载 压缩包 或克隆一个 git 仓库:

git clone https://github.com/quarkusio/quarkus-quickstarts.git

代码在 getting-started-reactivegetting-started-reactive-crud 目录中。

多方面了解 Quarkus 中的反应式

Quarkus 是反应式的. 如果你深入底层,会发现一个反应式引擎 Quarkus 应用程序提供动力。 此引擎是 Eclipse Vert.x (https://vertx.io)。 Every IO interaction passes through the non-blocking and reactive Vert.x engine.

Quarkus is based on a reactive engine

让我们举两个例子来解释它如何工作。 想象传入的 HTTP 请求。 (Vert.x) HTTP 服务器接收请求,然后将其路由到应用程序。 如果请求的是 JAX-RS 资源, 路由层在 worker 线程中调用资源方法,并在数据可用时写入响应。 迄今为止,没有问题。 以下图片描述了这种行为。

Behavior when using the imperative routes

But if the HTTP request targets a reactive (non-blocking) route, the routing layer invokes the route on the IO thread giving lots of benefits such as higher concurrency and performance:

Behavior when using the reactive routes

因此,许多 Quarkus 组件设计为反应式,例如数据库访问(PostgreSQL、MySQL、Mongo等),应用程序服务(邮件、模板引擎等)、消息(Kafka、AMQP等) 等等。 但是,为了从这个模型中充分受益,应用代码应该以非阻塞范式编写。 That’s where having a reactive API is an ultimate weapon.

Mutiny —— 一个反应式编程类库

Mutiny 是一个反应式编程库,允许表达和形成异步操作。 它提供两种类型:

  • io.smallrye.mutiny.Uni - 返回0或1个结果的异步操作

  • io.smallrye.mutiny.Multi - 返回多项 (带背压) 流

这两种类型都是 lazy 并遵循订阅模版。 仅在实际需要时(即订阅者注册时)开始计算。

uni.subscribe().with(
    result -> System.out.println("result is " + result),
    failure -> failure.printStackTrace()
);

multi.subscribe().with(
    item -> System.out.println("Got " + item),
    failure -> failure.printStackTrace()
);

UniMulti 都暴露了事件驱动的 API: 你想在事情发生时(成功、 失败等) 做什么。 这些API被分组(操作类型),以使其更清晰避免单个 class 出现100种方法。 操作的主要类型是对失败、完成几元素操纵、提取或收集作出反应。 它用一套可理解的 API 提供了一个简易的编码体验,结果不需要太多反应式的知识。

httpCall
    .onFailure().recoverWithItem("my fallback");

您可能会想知道 Reactive Streams (https://www.reactive-streams.org/). Multi 实现了Reactive Streams Publisher,所以实现了反应流的背压机制。 Uni 没有实现 Publisher ,因为 Uni 的订阅足以表明你对结果感兴趣。 由于Reactive Streams订阅/请求仪式较为复杂,因此还是想到更简单和更顺利的API。

UniMulti 两者是 Quarkus 的反应式必要的支柱,为必要的构建提供了桥梁。 例如,你可以将 Multi 转换为 Iterableawait 一个由 Uni 产生的数据。

// Block until the result is available
String result = uni.await().indefinitely();

// Transform an asynchronous stream into a blocking iterable
stream.subscribe().asIterable().forEach(s -> System.out.println("Item is " + s));

在这一点上,如果你是RxJava 或 Reactor 用户,你可能会想知道如何能够使用你熟悉的 Flowable, Single, Flux, Mono…​ Mutiny 允许将 RX Java 和 Reactor 类型与 UnisMultis 相互转换:

Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Flux<String> flux = multi.convert().with(MultiReactorConverters.toFlux());

但是,Vert.x 怎么样? Vert.x API 也可以使用 Mutiny 类型。 下面的代码片段显示了 Vert.x Web Client 的用法:

// Use io.vertx.mutiny.ext.web.client.WebClient
client = WebClient.create(vertx,
                new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
                        .setTrustAll(true));
// ...
Uni<JsonObject> uni =
    client.get("/api/fruit/" + name)
        .send()
        .onItem().apply(resp -> {
            if (resp.statusCode() == 200) {
                return resp.bodyAsJsonObject();
            } else {
                return new JsonObject()
                        .put("code", resp.statusCode())
                        .put("message", resp.bodyAsString());
            }
        });

最后但并非最不重要的是,Mutiny 已经内置与 MicroProfile Context Propagation,以便您在反应式管道中传播事务, 可追踪性数据,等等。

说得够多了,让我们开始动手!

初始化项目

创建一个新的 Quarkus 项目的最简单方式是打开一个终端并运行以下命令:

适用于 Linux 和 macOS 用户

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started-reactive \
    -DclassName="org.acme.quickstart.ReactiveGreetingResource" \
    -Dpath="/hello" \
    -Dextensions="resteasy-mutiny, resteasy-jsonb"
cd getting-started-reactive

适用于Windows用户

  • 如果使用 cmd, (不要使用前方斜线 \ )

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create -DprojectGroupId=org.acme -DprojectArtifactId=getting-started-reactive -DclassName="org.acme.quickstart.ReactiveGreetingResource" -Dpath="/hello" -Dextensions="resteasy-mutiny,resteasy-jsonb"
  • 如果使用 Powershell,用双引号换行 -D 参数

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create "-DprojectGroupId=org.acme" "-DprojectArtifactId=getting-started-reactive" "-DclassName=org.acme.quickstart.ReactiveGreetingResource" "-Dpath=/hello" "-Dextensions=resteasy-mutiny,resteasy-jsonb"

它在 ./getting-started-reactive 中生成以下内容:

  • Maven 项目结构

  • 接口 /hello 对应的资源 org.acme.quickstart.ReactiveGreetingResource

  • 关联的单元测试

  • 启动应用程序后可通过 http://localhost:8080 访问的着陆页

  • src/main/docker 中有 nativejvm 模式的 Dockerfile 样板文件

  • 应用程序配置文件

生成的 pom.xml 还声明了 RESTEasy Mutiny 支持和用于序列化的 RESTEasy JSON-B 。

反应式 JAX-RS 资源

在项目创建过程中,创建了 src/main/java/org/acme/quickstart/ReactiveGreetingResource.java 文件,内容如下:

package org.acme.quickstart;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class ReactiveGreetingResource {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
}

这是一个非常简单的 REST 接口,请求 "/hello" 时返回 "hello" 。

现在我们创建一个包含以下内容的 ReactiveGreetingService 类:

package org.acme.getting.started;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class ReactiveGreetingService {

    public Uni<String> greeting(String name) {
        return Uni.createFrom().item(name)
                .onItem().apply(n -> String.format("hello %s", name));
    }
}

然后编辑 ReactiveGreetingResource 成下边的样子:

package org.acme.getting.started;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.annotations.SseElementType;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import org.reactivestreams.Publisher;

@Path("/hello")
public class ReactiveGreetingResource {

    @Inject
    ReactiveGreetingService service;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/greeting/{name}")
    public Uni<String> greeting(@PathParam String name) {
        return service.greeting(name);
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
}

ReactiveGreetingService 类包含一个简单的方法返回一个 Uni。 此例中,结果项立即被生成,但你可以想象出产生 Uni 的异步API。 我们稍后在本指南中谈到这一点。

现在,启动应用程序:

./mvnw quarkus:dev

运行后,请访问 http://localhost:8080/hello/greeting/neo 来检查是否收到预期的欢迎信息。

处理流

迄今为止,我们只是返回了异步结果。 在本节中,我们扩展一下应用程序用流传多次数据。 这些数据流可以来自 Kafka 或任何其他数据来源,但为了简单起见,我们只是定期生成问候消息。

ReactiveGreetingService 中,添加以下方法:

public Multi<String> greetings(int count, String name) {
  return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .onItem().apply(n -> String.format("hello %s - %d", name, n))
        .transform().byTakingFirstItems(count);
}

每秒钟都生成一条问候消息,并在 count 条消息后停止。

ReactiveGreetingResource 中,添加以下方法:

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/greeting/{count}/{name}")
public Multi<String> greetings(@PathParam int count, @PathParam String name) {
  return service.greetings(count, name);
}

此接口将数据流通过 JSON 数组返回给客户端。 消息的名称和数量使用路径参数指定。

调用接口会产生如下效果:

$ curl http://localhost:8080/hello/greeting/3/neo
["hello neo - 0","hello neo - 1","hello neo - 2"]

我们还可以通过返回一个 Multi 生成 Server-Sent Event 响应:

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(@PathParam int count, @PathParam String name) {
    return service.greetings(count, name);
}

与前一个代码段的唯一区别是生成的类型和表示每个事件类型的注解 @SseElementType 。 由于 @Produces 的注解定义 SERVER_SENT_EVENTS, JAX-RS 需要它知道每次(嵌套) 事件的内容类型。

您可以看下结果:

$ curl -N http://localhost:8080/hello/stream/5/neo
data: hello neo - 0

data: hello neo - 1

data: hello neo - 2

data: hello neo - 3

data: hello neo - 4

使用反应式 APIs

使用 Quarkus 反应式APIs

Quarkus 通过 Mutiny 提供了许多反应式 API。 在本节中, 我们看下如何使用 Reactive PostgreSQL 驱动以非阻塞反应式与数据库进行交互。

创建一个新项目:

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started-reactive-crud \
    -DclassName="org.acme.reactive.crud.FruitResource" \
    -Dpath="/fruits" \
    -Dextensions="resteasy-mutiny, resteasy-jsonb, reactive-pg-client"
cd getting-started-reactive-crud

此应用程序与 PostgreSQL 数据库交互,因此您需要:

docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 \
           --name postgres-quarkus-reactive -e POSTGRES_USER=quarkus_test \
           -e POSTGRES_PASSWORD=quarkus_test -e POSTGRES_DB=quarkus_test \
           -p 5432:5432 postgres:11.2

然后,让我们配置数据源。 打开 src/main/resources/application.properties 并添加以下内容:

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus_test
quarkus.datasource.password=quarkus_test
quarkus.datasource.reactive.url=postgresql://localhost:5432/quarkus_test
myapp.schema.create=true

前三行定义了数据源。 最后一行将指示应用程序初始化后是否插入几项。

现在,让我们创建我们的 entity。 创建具有以下内容的 org.acme.crud.Fruit 类:

package org.acme.reactive.crud;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;

import java.util.stream.StreamSupport;

public class Fruit {

    public Long id;

    public String name;

    public Fruit() {
        // default constructor.
    }

    public Fruit(String name) {
        this.name = name;
    }

    public Fruit(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public static Multi<Fruit> findAll(PgPool client) {
        return client.query("SELECT id, name FROM fruits ORDER BY name ASC")
                // Create a Multi from the set of rows:
                .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                // For each row create a fruit instance
                .onItem().apply(Fruit::from);
    }

    public static Uni<Fruit> findById(PgPool client, Long id) {
        return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1", Tuple.of(id))
                .onItem().apply(RowSet::iterator)
                .onItem().apply(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
    }

    public Uni<Long> save(PgPool client) {
        return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)", Tuple.of(name))
                .onItem().apply(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
    }

    public Uni<Boolean> update(PgPool client) {
        return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2", Tuple.of(name, id))
                .onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
    }

    public static Uni<Boolean> delete(PgPool client, Long id) {
        return client.preparedQuery("DELETE FROM fruits WHERE id = $1", Tuple.of(id))
                .onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
    }

    private static Fruit from(Row row) {
        return new Fruit(row.getLong("id"), row.getString("name"));
    }
}

entity 包含几个字段和方法来查找、更新和删除数据库中的行。 这些方法返回 UnisMultis ,因为在检索后会异步产生数据。 注意反应式 PostgreSQL 客户端已经提供 UniMulti 两个实例。 所以你只将数据库中的结果转换为 business-friendly 对象。

然后,让我们在 FruitResource 中使用这个 Fruit 类。 然后编辑 FruitResource 成下边的样子:

package org.acme.reactive.crud;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import java.net.URI;

@Path("fruits")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class FruitResource {

    @Inject
    @ConfigProperty(name = "myapp.schema.create", defaultValue = "true")
    boolean schemaCreate;

    @Inject
    PgPool client;

    @PostConstruct
    void config() {
        if (schemaCreate) {
            initdb();
        }
    }

    private void initdb() {
        client.query("DROP TABLE IF EXISTS fruits")
                .flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)"))
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')"))
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')"))
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')"))
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')"))
                .await().indefinitely();
    }

    @GET
    public Multi<Fruit> get() {
        return Fruit.findAll(client);
    }

    @GET
    @Path("{id}")
    public Uni<Response> getSingle(@PathParam Long id) {
        return Fruit.findById(client, id)
                .onItem().apply(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
                .onItem().apply(ResponseBuilder::build);
    }

    @POST
    public Uni<Response> create(Fruit fruit) {
        return fruit.save(client)
                .onItem().apply(id -> URI.create("/fruits/" + id))
                .onItem().apply(uri -> Response.created(uri).build());
    }

    @PUT
    @Path("{id}")
    public Uni<Response> update(@PathParam Long id, Fruit fruit) {
        return fruit.update(client)
                .onItem().apply(updated -> updated ? Status.OK : Status.NOT_FOUND)
                .onItem().apply(status -> Response.status(status).build());
    }

    @DELETE
    @Path("{id}")
    public Uni<Response> delete(@PathParam Long id) {
        return Fruit.delete(client, id)
                .onItem().apply(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
                .onItem().apply(status -> Response.status(status).build());
    }
}

基于 Fruit 类生成的结果,该资源返回 UniMulti 实例。

使用 Vert.x 客户端

上一个示例使用由 Quarkus 提供的 service。 您也可以直接使用 Vert.x 客户端。 Vert.x API 有一个 Mutiny 版本。 此 API 被分成几个你可以独立导入的包:

groupId:artifactId 描述

io.smallrye.reactive:smallrye-mutiny-vertx-core

Mutiny API for Vert.x Core

io.smallrye.reactive:smallrye-mutiny-vertx-mail-client

Vert.x邮件客户端的Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-web-client

Vertiny API 适用于 Vert.x Web 客户端

io.smallrye.reactive:smallrye-mutiny-vertx-mongo-client

Vert.x Mongo 客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-redis-client

Vert.x Redis 客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-cassandra-client

Vert.x Cassandra客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client

Vert.x Consull 客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-kafka-client

Vert.x Kafka客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-amqp-client

Vert.xAMQP客户端的 Mutiny API

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client

Vert.x RabbitMQ 客户端的 Mutiny API

您也可以在 http://smallrye.io/smallrye-reactive-utils/apidocs/ 查看可用的 API。

让我们举一个例子。 将以下依赖添加到您的应用程序:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>

它提供了 Vert.x Web Client 的 Mutiny API。 然后,您可以如下使用 web client :

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    @Inject
    Vertx vertx;

    private WebClient client;

    @PostConstruct
    void initialize() {
        this.client = WebClient.create(vertx,
                new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
                        .setTrustAll(true));
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(@PathParam("name") String name) {
        return client.get("/api/fruit/" + name)
                .send()
                .map(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

有两个要点:

  1. 注入的 Vert.x 实例 io.vertx.mutiny.core.Vertx 类型是 Vert.x 的 Mutiny 变种;

  2. Web Client 由 io.vertx.mutiny.ext.web.client.WebClient 创建。

Mutiny 版本的 Vert.x APIs 也提供了:

  • andAwait 方法,如 sendAndAwaitandAwait 表示调用者线程被阻塞,直到结果可用。 请注意不要以这种方式阻塞 event loop / IO 线程。

  • andForget 方法,例如 writeAndForgetandForget 可用于返回 Uni 的方法。 andForget 表示你不需要由此产生的 Uni 表示操作成功或失败。 然而,请记住,如果您不订阅,操作将不会触发。 andForget 为你管理这个并管理订阅。

  • toMulti 方法允许将 Vert.x ReadStream 转换为 Multi

  • toBlockingIterable / toBlockingStream 方法允许将 Vert.x ReadStream 转换成阻塞式迭代或阻塞式 java.util.Stream

使用 RxJava 或 Reactor APIs

Mutiny 提供实用类将 RxJava 2 和 Project Reactor 转换为 Uni and Multi

RxJava 2 转换器在以下依赖中可用:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-rxjava</artifactId>
</dependency>

所以,如果你有一个 API 返回 RxJava 2 类型(Completable, Single, Maybe, Observable, Flowable),你可以如下创建 Unis and Multis

import io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.mutiny.converters.uni.UniRxConverters;
// ...
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);

Multi<Void> multiFromCompletable = Multi.createFrom().converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom().converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);

你也可以将 Unis and Multis 转换为 RxJava 类型:

Completable completable = uni.convert().with(UniRxConverters.toCompletable());
Single<Optional<String>> single = uni.convert().with(UniRxConverters.toSingle());
Single<String> single2 = uni.convert().with(UniRxConverters.toSingle().failOnNull());
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Observable<String> observable = uni.convert().with(UniRxConverters.toObservable());
Flowable<String> flowable = uni.convert().with(UniRxConverters.toFlowable());
// ...
Completable completable = multi.convert().with(MultiRxConverters.toCompletable());
Single<Optional<String>> single = multi.convert().with(MultiRxConverters.toSingle());
Single<String> single2 = multi.convert().with(MultiRxConverters
        .toSingle().onEmptyThrow(() -> new Exception("D'oh!")));
Maybe<String> maybe = multi.convert().with(MultiRxConverters.toMaybe());
Observable<String> observable = multi.convert().with(MultiRxConverters.toObservable());
Flowable<String> flowable = multi.convert().with(MultiRxConverters.toFlowable());

Project Reactor 转换器在以下依赖中可用:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-reactor</artifactId>
</dependency>

所以,如果你有一个 API 返回反应堆类型(Mono, Flux),你可以如下创建 Unis and Multis

import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
// ...
Uni<String> uniFromMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), mono);
Uni<String> uniFromFlux = Uni.createFrom().converter(UniReactorConverters.fromFlux(), flux);

Multi<String> multiFromMono = Multi.createFrom().converter(MultiReactorConverters.fromMono(), mono);
Multi<String> multiFromFlux = Multi.createFrom().converter(MultiReactorConverters.fromFlux(), flux);

你也可以将 UnisMultis 转换成 Reactor 类型:

Mono<String> mono = uni.convert().with(UniReactorConverters.toMono());
Flux<String> flux = uni.convert().with(UniReactorConverters.toFlux());

Mono<String> mono2 = multi.convert().with(MultiReactorConverters.toMono());
Flux<String> flux2 = multi.convert().with(MultiReactorConverters.toFlux());

使用 CompletionStages 或 Publisher API

如果你正在使用 ComplettionStage, CompletableFuture, 或 Publisher API,你可以将它转换还原。 首先, UniMulti 都可以从 CompletionStage 或从 Supplier<CompletionStage> 创建。 例如:

CompletableFuture<String> future = Uni
        // Create from a Completion Stage
        .createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"));

Uni 上,你也可以使用 subscribeAssComplettionStage() 生成一个 CompletitionStage ,它能获得 Uni 产生的数据或错误。

您也可以使用 createFrom().publisher(Publisher) 创建 Unis and Multis 。 你可以用 toMultiUni 转成 Publisher 。 的确,Multi 实现了 Publisher

接下来是什么?

本指南是一个 Quarkus 反应式介绍。 Quarkus 有许多功能已经反应式了。 下面的列表给您举出几个例子:

quarkus.pro 是基于 quarkus.io 的非官方中文翻译站 ,最后更新 2020/04 。
沪ICP备19006215号-8
QQ交流群:1055930959
微信群: