Kafka

本指南展示您的 Quarkus 应用程序如何使用 MicroProfile Reactive Messaging 与 Apache Kafka 交互。

必备条件

要完成本指南,您需要:

  • 15 分钟以内

  • IDE

  • 安装了JDK 1.8+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.6.2+

  • 一个能运行的 Kafka 群集,或用 Docker Compose 来启动一个开发集群

  • 如果您想在 native 模式下运行需要安装好 GraalVM 。

结构

在本指南中,我们将在一个组件中生成 (随机) 价格。 这些价格写入 Kafka topic (prices) 。 第二个组件从 Kafka topic prices 中读取,并对价格进行一些魔法转换。 结果被发送到给 JAX-RS 接口消费的内存流。 数据通过 Sse (server-sent events) 发送到浏览器。

Architecture

成果

我们建议您按照下面章节指示,一步一步创建应用程序。 但是你也可以直接跳到已完成的例子。

克隆 Git 仓库: git clone https://github.com/quarkusio/quarkus-quickstarts.git, 或下载 压缩包

解决方案位于 kafka-quickstart 目录

创建Maven项目

首先,我们需要一个新项目。 通过以下命令创建一个新项目:

mvn io.quarkus:quarkus-maven-plugin:1.3.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"
cd kafka-quickstart

此命令生成一个Maven项目,导入 Reactive Messaging 和Kafka连接器扩展。

启动 Kafka

然后,我们需要一个Kafka 集群。 您可以按照 Apache Kafka 官网 的指示做,或创建一个包含以下内容的 docker-compose.yaml 文件:

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

创建后,运行 docker-compose up

这只是开发用的集群,不适用于生产环境。

价格生成器

创建 src/main/java/org/acme/kafka/PriceGenerator.java 文件,内容如下:

package org.acme.kafka;

import io.reactivex.Flowable;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * A bean producing random prices every 5 seconds.
 * The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
 */
@ApplicationScoped
public class PriceGenerator {

    private Random random = new Random();

    @Outgoing("generated-price")                        (1)
    public Flowable<Integer> generate() {               (2)
        return Flowable.interval(5, TimeUnit.SECONDS)
                .map(tick -> random.nextInt(100));
    }

}
1 指示 Reactive Messaging 将返回的流数据传送到 generated-price
2 这个方法返回 RX Java 2 stream (Flowable) 每5秒钟生成一个随机的 price

该方法返回 Reactive Stream。 生成的数据被发送到名为 generated-price 的流。 此流映射到 Kafka,我们会在创建的 application.properties 文件中指定。

价格转换器

价格转换器读取了 Kafka 的价格并进行了转换。 创建 src/main/java/org/acme/kafka/PriceConverter.java 文件,内容如下:

package org.acme.kafka;

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
 * A bean consuming data from the "prices" Kafka topic and applying some conversion.
 * The result is pushed to the "my-data-stream" stream which is an in-memory stream.
 */
@ApplicationScoped
public class PriceConverter {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("prices")                                 (1)
    @Outgoing("my-data-stream")                         (2)
    @Broadcast                                          (3)
    public double process(int priceInUsd) {
        return priceInUsd * CONVERSION_RATE;
    }

}
1 指示该方法消费 prices topic 中的数据
2 指明该方法返回的对象将被发送到 my-data-stream
3 指明数据已发送到所有 订阅者

prices 主题 (在应用程序配置的) 中的每一条 Kafka record 都调用了 process 方法。 每个结果都会发送到内存流 my-data stream

价格接口

最后,让我们的流绑定到 JAX-RS 接口。 创建 src/main/java/org/acme/kafka/PriceResource.java 文件,内容如下:

package org.acme.kafka;

import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;

/**
 * A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
 */
@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("my-data-stream") Publisher<Double> prices; (1)

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) (2)
    @SseElementType("text/plain") (3)
    public Publisher<Double> stream() { (4)
        return prices;
    }
}
1 使用 @Channel 限定符注入 my-data-stream 频道
2 指示内容使用 Server Sent Events 发送
3 指示 SSE 包含的数据类型为 text/plain
4 返回流 (Reactive Stream)

配置 Kafka 连接器

我们需要配置 Kafka 连接器。 这是在 application.properties 文件中完成的。 关键结构如下:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 部分必须匹配 @Incoming@Outgoing 注解中设定的值:

  • generated-price → 我们写入价格的地方

  • prices → 我们读取价格的数据源

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

更多关于此配置的详细信息可在 Kafka文档的 Producer configurationConsumer configuration 部分查阅。

my-data-stream 是什么? 这是一个内存流,未连接到 message broker 。

HTML 页面

最后,HTML页面使用 SSE 读取已转换的价格。

创建 src/main/resources/META-INF/resources/prices.html 文件,内容如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Last price</h2>
    <div class="row">
    <p class="col-md-12">The last price is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script>
    var source = new EventSource("/prices/stream");
    source.onmessage = function (event) {
        document.getElementById("content").innerHTML = event.data;
    };
</script>
</html>

这里没有特别的。 每次收到价格,它都会更新页面。

运行

如果你遵照指示,Kafka应该已经运行。 然后,你只需要运行应用程序:

./mvnw quarkus:dev

在您的浏览器中打开 http://localhost:8080/prices.html

如果你使用 docker compose 启动了 Kafka , 运行 docker-compose down 后按 CTRL+C 停掉它。

Native 运行

您可以通过以下方式构建 native 可执行文件:

./mvnw package -Pnative

Imperative usage

Sometimes, you need to have an imperative way of sending messages.

例如,如果您需要在 REST 接口收到 POST 请求时内部发送消息到一个流。 在这种情况下,你不能使用 @Output ,因为你的方法有参数。

为此,您可以使用 Emitter

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}
Emitter 配置与 @Incoming@Outgoing 使用的流配置相同。 此外,您可以使用 @OnOverflow 来配置背压策略。
废弃的

io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel and io.smallrye.reactive.messaging.annotations.OnOverflow 等类现在已被废弃并替换为:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

The new Emitter.send method returns a CompletionStage completed when the produced message is acknowledged.

Kafka 健康检查

如果您使用 quarkus-smallrye-health 扩展,quarkus-kafka 可以添加准备状态健康检查 来验证与经纪人的连接。 默认情况下禁用。

如果启用,当您访问应用程序的 /health/ready 端点时,您将获得关于连接验证状态的信息。

这种行为可以通过将 quarkus.kafka.health.enabled 属性设置为 true

JSON 序列化

Quarkus 内置能处理 JSON Kafka 信息。

想象一下,我们有一个 Fruit pojo:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

我们想利用它接收来自Kafka的消息,进行一些价格转换,并将消息发回Kafka。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" Kafka topic and applying some price conversion.
* The result is pushed to the "fruit-out" stream.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public double process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

为此,我们需要设置 JSON-B 或 Jackson 来处理 JSON 序列化。

JSON 序列化正确配置后,您也可以使用 Publisher<Fruit>Emitter<Fruit>

通过 JSON-B 序列化

首先,您需要包含 quarkus-resteasy-jsonb extension(如果您已经使用 quarkus-resteasy-jsonb 扩展就不需要了)。

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>

有个现成的 JsonbSerializer 通过JSON-B 用来序列化所有 pojos, 但是对应的反序列化器是通用的,所以它需要创建一个反序列化的子类。

因此,让我们创建一个 FruitDeserializer 来扩展通用的 JsonbDeserializer

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer(){
        // pass the class to the parent.
        super(Fruit.class);
    }
}
如果你不想为你的每个pojo创建一个反序列化器,你可以使用通用的`io.vertx.kafka.client.serialization.JsonObjectDeserializer` 反序列化成 javax.json.JsonObject 。 也可以使用相应的序列转换器:io.vertx.kafka.client.serialization.JsonObjectSerializer

最后,配置您的流使用 JSON-B 序列化器和反序列化器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

现在,您的 Kafka 信息将包含 JSON-B 序列化 Fruit pojo 后的 json。

通过 Jackson 序列化

首先,你需要包含 quarkus-resteasy-jackson 扩展(如果你已经使用 quarkus-jackson-jsonb 扩展就不需要了)。

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

有个现成的 ObjectMapperSerializer 可用于通过 Jackson 对所有pojos 进行序列化。 但是相应的反序列化器是通用的,所以它需要子类化。

因此,让我们创建一个 FruitDeserializer 扩展下 ObjectMapperDeserializer

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer(){
        // pass the class to the parent.
        super(Fruit.class);
    }
}

最后,配置您的流使用Jackson序列化器和反序列化器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

现在,您的 Kafka 信息将包含 Jackson 序列化 Fruit pojo 后的 json。

发送 JSON 服务器发送事件 (SSE)

如果你想 RESTEasy 发送 JSON SSE (Server-Sent Events),你需要使用 @SseElementType 来定义事件的内容类型。 此方法将注解为 @Produces(MediaType.SERVER_SENT_EVENTS).

下面的示例显示如何从 Kafka topic 来源使用 SSE 。

import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;

@Path("/fruits")
public class PriceResource {

    @Inject
    @Channel("fruit-out") Publisher<Fruit> fruits;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    public Publisher<Fruit> stream() {
        return fruits;
    }
}

进一步发展

本指南展示了您如何使用 Quarkus 与 Kafka 交互。 它使用 MicroProfile Reactive Messaging 来构建数据流应用。

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.

quarkus.pro 是基于 quarkus.io 的非官方中文翻译站 ,最后更新 2020/04 。
沪ICP备19006215号-8
QQ交流群:1055930959
微信群: