Reactive Messaging
Quarkus 允许不同的 bean 使用异步信息交互,强制解耦。 消息被发送至 虚拟地址。 它提供了三种类型的交付机制:
-
点对点-发送消息,只有一个消费者接收它。 如果几个消费者监听地址,a round robin is applied;
-
发布/订阅 - 发布消息,监听地址的所有消费者都收到消息;
-
请求/回复 - 发送消息并期待回复。 接收者可以异步方式响应消息
所有这些运载机制都是非阻塞的,并且为建立反应式应用提供基石。
The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. 然而,它仅限于单事件行为(无流)和本地信息。 |
安装
此机制使用 Vert.x EventBus ,因此您需要启用 vertx
扩展才能使用此功能。
如果您正在创建一个新项目,请设置 extensions
参数如下:
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=vertx-quickstart \
-Dextensions="vertx"
cd vertx-quickstart
如果您已经创建了一个项目,extension vertx
可以添加到现有的 Quarkus 项目中,使用
add-extension
命令:
./mvnw quarkus:add-extension -Dextensions="vertx"
此外,你可以手动将这个添加到你的 pom.xml
文件的依赖部分:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
消费事件
要消耗事件,请使用 io.quarkus.vertx.ConsumeEvent
注解:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果未设置,地址是 bean 的完全限定名称。例如,在这个代码段中,它是`org.acme.vertx.GreetingService` 。 |
2 | 方法参数是消息正文。 如果方法返回 something ,它是消息响应。 |
默认情况下,消费事件的代码必须是 非阻塞的,正如它在 Vert.x event loop 上所调用。
如果您的处理是阻塞的,请使用
|
也可以返回一个 io.smallrye.mutiny.Uni
或 java.util.combuttionStage
异步处理:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
上一个示例使用 Mutiny 反应式类型,如果你不熟悉,我们建议阅读 反应式入门指南 。 |
配置地址
@ConsumeEvent
注解可以设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发给 greeting 地址的消息 |
回复
注解了 @ConsumeEvent
的方法 return 值用作响应收到的消息。
例如,在下面的代码段中,返回的 String
是应答。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您也可以返回一个 Uni<T>
或 CompletionStage<T>
来处理异步回复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用上下文传播 extension ,您可以注入一个
|
发送消息
好了,我们看到了如何接收消息,让我们现在切换到 另一端: 发送者。 使用 Vert.x event bus 发送和发布消息:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) (2)
.onItem().apply(Message::body);
}
}
1 | 注入 Event bus |
2 | 发送信息给地址 greeting 。 Message payload is name |
EventBus
对象提供了方法:
-
send
消息到某个特定地址——单个消费者接收消息。 -
publish
消息到一个特定的地址 - 所有消费者都收到消息。 -
send
消息并期待回复
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().apply(Message::body);
都连在一起——连接 HTTP 和消息
让我们重温 greeting HTTP 接口 并使用异步消息将调用委托给另一个分离的 bean。 它使用请求/应答发送机制。 我们不在 JAX-RS 接口内执行业务逻辑,而是发出一个信息。 此消息被另一个 bean 消耗,响应将使用 reply 机制发送。
首先创建一个新项目,使用:
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=vertx-http-quickstart \
-Dextensions="vertx"
cd vertx-http-quickstart
您已经可以使用 ./mvnw compile quarkus:dev
在 dev 模式 下启动应用了。
然后,创建一个新的 JAX-RS 接口,内容如下:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) (1)
.onItem().apply(Message::body); (2)
}
}
1 | 发送 name 到 greeting 地址并请求回复 |
2 | 当我们得到响应时,提取响应内容并发送给用户 |
如果您调用此接口,您将等待并超时。 事实上,没有人在监听。
因此,我们需要监听 greeting
的消费者。 创建一个带有以下内容的 GreetingService
bean:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
这个bean 接收名字,然后返回问候消息。
现在打开您的浏览器访问 http://localhost:8080/async/Quarkus,您将看到:
Hello Quarkus
为了更好地理解,让我们详细了解 HTTP 请求/响应是如何处理的:
-
通过
hello
方法收到请求 -
包含 name 的消息被发送到 event bus
-
另一个 bean 收到此消息并响应
-
此响应将使用回复机制发送
-
发送者收到回复后,内容将被写入 HTTP 响应
此应用程序可以使用以下方式打包:
./mvnw clean package
您也可以通过以下方式将其编译成 native 可执行文件:
./mvnw clean package -Pnative