Kafka
本指南展示您的 Quarkus 应用程序如何使用 MicroProfile Reactive Messaging 与 Apache Kafka 交互。
必备条件
要完成本指南,您需要:
-
15 分钟以内
-
IDE
-
安装了JDK 1.8+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.6.2+
-
一个能运行的 Kafka 群集,或用 Docker Compose 来启动一个开发集群
-
如果您想在 native 模式下运行需要安装好 GraalVM 。
结构
在本指南中,我们将在一个组件中生成 (随机) 价格。
这些价格写入 Kafka topic (prices
) 。
第二个组件从 Kafka topic prices
中读取,并对价格进行一些魔法转换。
结果被发送到给 JAX-RS 接口消费的内存流。
数据通过 Sse (server-sent events) 发送到浏览器。
成果
我们建议您按照下面章节指示,一步一步创建应用程序。 但是你也可以直接跳到已完成的例子。
克隆 Git 仓库: git clone https://github.com/quarkusio/quarkus-quickstarts.git
, 或下载 压缩包
解决方案位于 kafka-quickstart
目录。
创建Maven项目
首先,我们需要一个新项目。 通过以下命令创建一个新项目:
mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="kafka"
cd kafka-quickstart
此命令生成一个Maven项目,导入 Reactive Messaging 和Kafka连接器扩展。
启动 Kafka
然后,我们需要一个Kafka 集群。
您可以按照 Apache Kafka 官网 的指示做,或创建一个包含以下内容的 docker-compose.yaml
文件:
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
创建后,运行 docker-compose up
。
这只是开发用的集群,不适用于生产环境。 |
价格生成器
创建 src/main/java/org/acme/kafka/PriceGenerator.java
文件,内容如下:
package org.acme.kafka;
import io.reactivex.Flowable;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* A bean producing random prices every 5 seconds.
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
*/
@ApplicationScoped
public class PriceGenerator {
private Random random = new Random();
@Outgoing("generated-price") (1)
public Flowable<Integer> generate() { (2)
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100));
}
}
1 | 指示 Reactive Messaging 将返回的流数据传送到 generated-price 。 |
2 | 这个方法返回 RX Java 2 stream (Flowable ) 每5秒钟生成一个随机的 price。 |
该方法返回 Reactive Stream。 生成的数据被发送到名为 generated-price
的流。
此流映射到 Kafka,我们会在创建的 application.properties
文件中指定。
价格转换器
价格转换器读取了 Kafka 的价格并进行了转换。
创建 src/main/java/org/acme/kafka/PriceConverter.java
文件,内容如下:
package org.acme.kafka;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
/**
* A bean consuming data from the "prices" Kafka topic and applying some conversion.
* The result is pushed to the "my-data-stream" stream which is an in-memory stream.
*/
@ApplicationScoped
public class PriceConverter {
private static final double CONVERSION_RATE = 0.88;
@Incoming("prices") (1)
@Outgoing("my-data-stream") (2)
@Broadcast (3)
public double process(int priceInUsd) {
return priceInUsd * CONVERSION_RATE;
}
}
1 | 指示该方法消费 prices topic 中的数据 |
2 | 指明该方法返回的对象将被发送到 my-data-stream 流 |
3 | 指明数据已发送到所有 订阅者 |
prices
主题 (在应用程序配置的) 中的每一条 Kafka record 都调用了 process
方法。
每个结果都会发送到内存流 my-data stream
。
价格接口
最后,让我们的流绑定到 JAX-RS 接口。
创建 src/main/java/org/acme/kafka/PriceResource.java
文件,内容如下:
package org.acme.kafka;
import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;
/**
* A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
*/
@Path("/prices")
public class PriceResource {
@Inject
@Channel("my-data-stream") Publisher<Double> prices; (1)
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
@SseElementType("text/plain") (3)
public Publisher<Double> stream() { (4)
return prices;
}
}
1 | 使用 @Channel 限定符注入 my-data-stream 频道 |
2 | 指示内容使用 Server Sent Events 发送 |
3 | 指示 SSE 包含的数据类型为 text/plain |
4 | 返回流 (Reactive Stream) |
配置 Kafka 连接器
我们需要配置 Kafka 连接器。 这是在 application.properties
文件中完成的。
关键结构如下:
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
部分必须匹配 @Incoming
和 @Outgoing
注解中设定的值:
-
generated-price
→ 我们写入价格的地方 -
prices
→ 我们读取价格的数据源
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
更多关于此配置的详细信息可在 Kafka文档的 Producer configuration 和 Consumer configuration 部分查阅。
my-data-stream 是什么? 这是一个内存流,未连接到 message broker 。
|
HTML 页面
最后,HTML页面使用 SSE 读取已转换的价格。
创建 src/main/resources/META-INF/resources/prices.html
文件,内容如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Prices</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Last price</h2>
<div class="row">
<p class="col-md-12">The last price is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script>
var source = new EventSource("/prices/stream");
source.onmessage = function (event) {
document.getElementById("content").innerHTML = event.data;
};
</script>
</html>
这里没有特别的。 每次收到价格,它都会更新页面。
运行
如果你遵照指示,Kafka应该已经运行。 然后,你只需要运行应用程序:
./mvnw quarkus:dev
在您的浏览器中打开 http://localhost:8080/prices.html
。
如果你使用 docker compose 启动了 Kafka , 运行 docker-compose down 后按 CTRL+C 停掉它。
|
Imperative usage
Sometimes, you need to have an imperative way of sending messages.
例如,如果您需要在 REST 接口收到 POST 请求时内部发送消息到一个流。
在这种情况下,你不能使用 @Output
,因为你的方法有参数。
为此,您可以使用 Emitter
。
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}
Emitter 配置与 @Incoming 和 @Outgoing 使用的流配置相同。
此外,您可以使用 @OnOverflow 来配置背压策略。
|
废弃的
The new |
Kafka 健康检查
如果您使用 quarkus-smallrye-health
扩展,quarkus-kafka
可以添加准备状态健康检查
来验证与经纪人的连接。 默认情况下禁用。
如果启用,当您访问应用程序的 /health/ready
端点时,您将获得关于连接验证状态的信息。
这种行为可以通过将 quarkus.kafka.health.enabled
属性设置为 true
。
JSON 序列化
Quarkus 内置能处理 JSON Kafka 信息。
想象一下,我们有一个 Fruit
pojo:
public class Fruit {
public String name;
public int price;
public Fruit() {
}
public Fruit(String name, int price) {
this.name = name;
this.price = price;
}
}
我们想利用它接收来自Kafka的消息,进行一些价格转换,并将消息发回Kafka。
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
/**
* A bean consuming data from the "fruit-in" Kafka topic and applying some price conversion.
* The result is pushed to the "fruit-out" stream.
*/
@ApplicationScoped
public class FruitProcessor {
private static final double CONVERSION_RATE = 0.88;
@Incoming("fruit-in")
@Outgoing("fruit-out")
@Broadcast
public double process(Fruit fruit) {
fruit.price = fruit.price * CONVERSION_RATE;
return fruit;
}
}
为此,我们需要设置 JSON-B 或 Jackson 来处理 JSON 序列化。
JSON 序列化正确配置后,您也可以使用 Publisher<Fruit> 和 Emitter<Fruit> 。
|
通过 JSON-B 序列化
首先,您需要包含 quarkus-resteasy-jsonb
extension(如果您已经使用 quarkus-resteasy-jsonb
扩展就不需要了)。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>
有个现成的 JsonbSerializer
通过JSON-B 用来序列化所有 pojos,
但是对应的反序列化器是通用的,所以它需要创建一个反序列化的子类。
因此,让我们创建一个 FruitDeserializer
来扩展通用的 JsonbDeserializer
。
package com.acme.fruit.jsonb;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
public class FruitDeserializer extends JsonbDeserializer<Fruit> {
public FruitDeserializer(){
// pass the class to the parent.
super(Fruit.class);
}
}
如果你不想为你的每个pojo创建一个反序列化器,你可以使用通用的`io.vertx.kafka.client.serialization.JsonObjectDeserializer`
反序列化成 javax.json.JsonObject 。 也可以使用相应的序列转换器:io.vertx.kafka.client.serialization.JsonObjectSerializer 。
|
最后,配置您的流使用 JSON-B 序列化器和反序列化器。
# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
现在,您的 Kafka 信息将包含 JSON-B 序列化 Fruit pojo 后的 json。
通过 Jackson 序列化
首先,你需要包含 quarkus-resteasy-jackson
扩展(如果你已经使用 quarkus-jackson-jsonb
扩展就不需要了)。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
有个现成的 ObjectMapperSerializer
可用于通过 Jackson 对所有pojos 进行序列化。
但是相应的反序列化器是通用的,所以它需要子类化。
因此,让我们创建一个 FruitDeserializer
扩展下 ObjectMapperDeserializer
。
package com.acme.fruit.jackson;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
public FruitDeserializer(){
// pass the class to the parent.
super(Fruit.class);
}
}
最后,配置您的流使用Jackson序列化器和反序列化器。
# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
现在,您的 Kafka 信息将包含 Jackson 序列化 Fruit pojo 后的 json。
发送 JSON 服务器发送事件 (SSE)
如果你想 RESTEasy 发送 JSON SSE (Server-Sent Events),你需要使用 @SseElementType
来定义事件的内容类型。
此方法将注解为 @Produces(MediaType.SERVER_SENT_EVENTS)
.
下面的示例显示如何从 Kafka topic 来源使用 SSE 。
import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;
@Path("/fruits")
public class PriceResource {
@Inject
@Channel("fruit-out") Publisher<Fruit> fruits;
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Publisher<Fruit> stream() {
return fruits;
}
}
进一步发展
本指南展示了您如何使用 Quarkus 与 Kafka 交互。 它使用 MicroProfile Reactive Messaging 来构建数据流应用。
If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.