Reactive Messaging

Quarkus 允许不同的 bean 使用异步信息交互,强制解耦。 消息被发送至 虚拟地址。 它提供了三种类型的交付机制:

  • 点对点-发送消息,只有一个消费者接收它。 如果几个消费者监听地址,a round robin is applied;

  • 发布/订阅 - 发布消息,监听地址的所有消费者都收到消息;

  • 请求/回复 - 发送消息并期待回复。 接收者可以异步方式响应消息

所有这些运载机制都是非阻塞的,并且为建立反应式应用提供基石。

The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. 然而,它仅限于单事件行为(无流)和本地信息。

安装

此机制使用 Vert.x EventBus ,因此您需要启用 vertx 扩展才能使用此功能。 如果您正在创建一个新项目,请设置 extensions 参数如下:

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-quickstart \
    -Dextensions="vertx"
cd vertx-quickstart

如果您已经创建了一个项目,extension vertx 可以添加到现有的 Quarkus 项目中,使用 add-extension 命令:

./mvnw quarkus:add-extension -Dextensions="vertx"

此外,你可以手动将这个添加到你的 pom.xml 文件的依赖部分:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>

消费事件

要消耗事件,请使用 io.quarkus.vertx.ConsumeEvent 注解:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果未设置,地址是 bean 的完全限定名称。例如,在这个代码段中,它是`org.acme.vertx.GreetingService` 。
2 方法参数是消息正文。 如果方法返回 something ,它是消息响应。

默认情况下,消费事件的代码必须是 非阻塞的,正如它在 Vert.x event loop 上所调用。 如果您的处理是阻塞的,请使用 blocking 属性:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

也可以返回一个 io.smallrye.mutiny.Unijava.util.combuttionStage 异步处理:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

上一个示例使用 Mutiny 反应式类型,如果你不熟悉,我们建议阅读 反应式入门指南

配置地址

@ConsumeEvent 注解可以设置地址:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发给 greeting 地址的消息

回复

注解了 @ConsumeEvent 的方法 return 值用作响应收到的消息。 例如,在下面的代码段中,返回的 String 是应答。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您也可以返回一个 Uni<T>CompletionStage<T> 来处理异步回复:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用上下文传播 extension ,您可以注入一个 executor

@Inject Executor executor;

Implementing fire and forget interactions

回复收到的消息不是必须的。 典型的情况是,对于_fire 和 forget_ 交互,消息被消费,发件者不需要知道它。 为了实现这一点,您的消费方法只需返回 void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

处理消息

如上所述,这一机制是以 Vert.x event bus 为基础的。 因此,您也可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

发送消息

好了,我们看到了如何接收消息,让我们现在切换到 另一端: 发送者。 使用 Vert.x event bus 发送和发布消息:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(@PathParam String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().apply(Message::body);
    }
}
1 注入 Event bus
2 发送信息给地址 greeting。 Message payload is name

EventBus 对象提供了方法:

  1. send 消息到某个特定地址——单个消费者接收消息。

  2. publish 消息到一个特定的地址 - 所有消费者都收到消息。

  3. send 消息并期待回复

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().apply(Message::body);

都连在一起——连接 HTTP 和消息

让我们重温 greeting HTTP 接口 并使用异步消息将调用委托给另一个分离的 bean。 它使用请求/应答发送机制。 我们不在 JAX-RS 接口内执行业务逻辑,而是发出一个信息。 此消息被另一个 bean 消耗,响应将使用 reply 机制发送。

首先创建一个新项目,使用:

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-http-quickstart \
    -Dextensions="vertx"
cd vertx-http-quickstart

您已经可以使用 ./mvnw compile quarkus:devdev 模式 下启动应用了。

然后,创建一个新的 JAX-RS 接口,内容如下:

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(@PathParam String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().apply(Message::body);            (2)
    }
}
1 发送 namegreeting 地址并请求回复
2 当我们得到响应时,提取响应内容并发送给用户

如果您调用此接口,您将等待并超时。 事实上,没有人在监听。 因此,我们需要监听 greeting 的消费者。 创建一个带有以下内容的 GreetingService bean:

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

这个bean 接收名字,然后返回问候消息。

现在打开您的浏览器访问 http://localhost:8080/async/Quarkus,您将看到:

Hello Quarkus

为了更好地理解,让我们详细了解 HTTP 请求/响应是如何处理的:

  1. 通过 hello 方法收到请求

  2. 包含 name 的消息被发送到 event bus

  3. 另一个 bean 收到此消息并响应

  4. 此响应将使用回复机制发送

  5. 发送者收到回复后,内容将被写入 HTTP 响应

此应用程序可以使用以下方式打包:

./mvnw clean package

您也可以通过以下方式将其编译成 native 可执行文件:

./mvnw clean package -Pnative
quarkus.pro 是基于 quarkus.io 的非官方中文翻译站 ,最后更新 2020/04 。
沪ICP备19006215号-8
QQ交流群:1055930959
微信群: