当前位置: 首页 > 文档资料 > Vert.x 中文文档 >

Integration - Kafka Client

优质
小牛编辑
129浏览
2023-12-01

中英文对照表

组件介绍

此组件提供了 Kafka Client 的集成,可以以 Vert.x 的方式从 Apache Kafka 集群上消费或者发送消息。

对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition),或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。

对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。

警告:此组件处于技术预览阶段,因此在之后版本中API可能还会发生一些变更。

使用 Vert.x Kafka Client

要使用 Vert.x Kafka Client 组件,需要添加以下依赖:

  • Maven(在 pom.xml文件中):
  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-kafka-client</artifactId>
  4. <version>3.4.1</version>
  5. </dependency>
  • Gradle(在build.gradle文件中):
  1. compile 'io.vertx:vertx-kafka-client:3.4.1'

创建 Kafka Client

创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。

我们需要对 Consumer 与 Producer 进行一些相关的配置,具体可以参考 Apache Kafka 的官方文档:

我们可以通过一个 Map 来包装这些配置,然后将其传入到 KafkaConsumer 接口或 KafkaProducer 接口中的 create 静态方法里来创建 KafkaConsumerKafkaProducer

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  4. config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  5. config.put("group.id", "my_group");
  6. config.put("auto.offset.reset", "earliest");
  7. config.put("enable.auto.commit", "false");
  8. // 创建一个Kafka Consumer
  9. KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

在上面的例子中,我们在创建 KafkaConsumer 实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。

我们可以用类似的方法来创建 Producer:

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. config.put("acks", "1");
  6. // 创建一个Kafka Producer
  7. KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

另外也可以使用 Properties 来代替 Map:

  1. Properties config = new Properties();
  2. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  3. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  4. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  5. config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
  6. config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  7. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  8. KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

消息的 key 和 value 的序列化格式也可以作为 create 方法的参数直接传进去,而不是在相关配置中指定:

  1. Properties config = new Properties();
  2. config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  3. config.put(ProducerConfig.ACKS_CONFIG, "1");
  4. // 注意这里的第三和第四个参数
  5. KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);

在这里,我们在创建 KafkaProducer 实例的时候传入了一个 Properties 实例,用于指定要连接的 Kafka 节点列表(只有一个)和消息确认模式。消息 key 和 value 的解析方式作为参数传入 KafkaProducer.create 方法中。

消费感兴趣 Topic 的消息并加入消费组

我们可以通过 KafkaConsumer 的的 subscribe 方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。当然你需要通过 handler 方法注册一个 Handler 来处理接收的消息:

  1. consumer.handler(record -> {
  2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
  3. ",partition=" + record.partition() + ",offset=" + record.offset());
  4. });
  5. // 订阅多个topic
  6. Set<String> topics = new HashSet<>();
  7. topics.add("topic1");
  8. topics.add("topic2");
  9. topics.add("topic3");
  10. consumer.subscribe(topics);
  11. // 订阅单个主题
  12. consumer.subscribe("a-single-topic");

另外如果想知道消息是否成功被消费掉,可以在调用 subscribe 方法时绑定一个 Handler

  1. consumer.handler(record -> {
  2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
  3. ",partition=" + record.partition() + ",offset=" + record.offset());
  4. });
  5. // subscribe to several topics
  6. Set<String> topics = new HashSet<>();
  7. topics.add("topic1");
  8. topics.add("topic2");
  9. topics.add("topic3");
  10. //这里lambda表达式用于接收消息处理结果
  11. consumer.subscribe(topics, ar -> {
  12. if (ar.succeeded()) {
  13. System.out.println("subscribed");
  14. } else {
  15. System.out.println("Could not subscribe " + ar.cause().getMessage());
  16. }
  17. });
  18. //这里lambda表达式用于接收消息处理结果
  19. consumer.subscribe("a-single-topic", ar -> {
  20. if (ar.succeeded()) {
  21. System.out.println("subscribed");
  22. } else {
  23. System.out.println("Could not subscribe " + ar.cause().getMessage());
  24. }
  25. });

由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition,同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。

如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。您可以通过 partitionsRevokedHandlerpartitionsAssignedHandler 方法在 KafkaConsumer 里注册一个 Handler 用于监听对应的 partition 是否被删除或者分配。

  1. consumer.handler(record -> {
  2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
  3. ",partition=" + record.partition() + ",offset=" + record.offset());
  4. });
  5. // 注册一个用于侦听新分配partition的Handler
  6. consumer.partitionsAssignedHandler(topicPartitions -> {
  7. System.out.println("Partitions assigned");
  8. for (TopicPartition topicPartition : topicPartitions) {
  9. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  10. }
  11. });
  12. // 注册一个用于侦听撤销partition的Handler
  13. consumer.partitionsRevokedHandler(topicPartitions -> {
  14. System.out.println("Partitions revoked");
  15. for (TopicPartition topicPartition : topicPartitions) {
  16. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  17. }
  18. });
  19. // subscribes to the topic
  20. consumer.subscribe("test", ar -> {
  21. if (ar.succeeded()) {
  22. System.out.println("Consumer subscribed");
  23. }
  24. });

加入某个 consumer group 的消费者,可以通过 unsubscribe 方法退出该消费组,从而不再接受到相关消息:

  1. consumer.unsubscribe();

当然你也可以在 unsubscribe 方法中传入一个 Handler 用于监听执行结果状态:

  1. consumer.unsubscribe(ar -> {
  2. if (ar.succeeded()) {
  3. System.out.println("Consumer unsubscribed");
  4. }
  5. });

从 Topic 的特定分区里接收消息

消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那幺整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。

您可以通过 assign 方法请求分配指定的分区:

  1. consumer.handler(record -> {
  2. System.out.println("key=" + record.key() + ",value=" + record.value() +
  3. ",partition=" + record.partition() + ",offset=" + record.offset());
  4. });
  5. //
  6. Set<TopicPartition> topicPartitions = new HashSet<>();
  7. topicPartitions.add(new TopicPartition()
  8. .setTopic("test")
  9. .setPartition(0));
  10. // 要求分配到特定的topic以及partitions
  11. consumer.assign(topicPartitions, done -> {
  12. if (done.succeeded()) {
  13. System.out.println("Partition assigned");
  14. // 侦听分配结果
  15. consumer.assignment(done1 -> {
  16. if (done1.succeeded()) {
  17. for (TopicPartition topicPartition : done1.result()) {
  18. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  19. }
  20. }
  21. });
  22. }
  23. });

上面的 assignment 方法可以列出当前分配的 topic partition。

获取 Topic 以及分区信息

您可以通过 partitionsFor 方法获取指定 topic 的 partition 信息:

  1. consumer.partitionsFor("test", ar -> {
  2. if (ar.succeeded()) {
  3. for (PartitionInfo partitionInfo : ar.result()) {
  4. System.out.println(partitionInfo);
  5. }
  6. }
  7. });

另外,listTopics 方法可以列出消费者下的所有 topic 以及对应的 partition 信息:

  1. consumer.listTopics(ar -> {
  2. if (ar.succeeded()) {
  3. Map<String, List<PartitionInfo>> map = ar.result();
  4. map.forEach((topic, partitions) -> {
  5. System.out.println("topic = " + topic);
  6. System.out.println("partitions = " + map.get(topic));
  7. });
  8. }
  9. });

手动提交偏移量

在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer 时将 enable.auto.commit 配置项设为 true 来开启自动提交。

我们可以通过 commit 方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。

  1. consumer.commit(ar -> {
  2. if (ar.succeeded()) {
  3. System.out.println("Last read message offset committed");
  4. }
  5. });

分区偏移量定位

Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作,并从任意指定的 topic 以及 partition 位置开始消费消息。我们可以通过 seek 方法来更改读取位置对应的偏移量:

  1. TopicPartition topicPartition = new TopicPartition()
  2. .setTopic("test")
  3. .setPartition(0);
  4. // 指定offset位置10
  5. consumer.seek(topicPartition, 10, done -> {
  6. if (done.succeeded()) {
  7. System.out.println("Seeking done");
  8. }
  9. });

当消费者需要从 Stream 的起始位置读取消息时,可以使用 seekToBeginning 方法将 offset 位置设置到 partition 的起始端:

  1. TopicPartition topicPartition = new TopicPartition()
  2. .setTopic("test")
  3. .setPartition(0);
  4. // 将offset挪到分区起始端
  5. consumer.seekToBeginning(Collections.singleton(topicPartition), done -> {
  6. if (done.succeeded()) {
  7. System.out.println("Seeking done");
  8. }
  9. });

最后我们也可以通过 seekToEnd 方法将 offset 位置设置到 partition 的末端:

  1. TopicPartition topicPartition = new TopicPartition()
  2. .setTopic("test")
  3. .setPartition(0);
  4. // 将offset挪到分区末端
  5. consumer.seekToEnd(Collections.singleton(topicPartition), done -> {
  6. if (done.succeeded()) {
  7. System.out.println("Seeking done");
  8. }
  9. });

偏移量查询

你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets 来获取给定分区的起始偏移量。这个跟上面的 seekToBeginning 方法有一个地方不同:beginningOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

  1. Set<TopicPartition> topicPartitions = new HashSet<>();
  2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
  3. topicPartitions.add(topicPartition);
  4. consumer.beginningOffsets(topicPartitions, done -> {
  5. if(done.succeeded()) {
  6. Map<TopicPartition, Long> results = done.result();
  7. results.forEach((topic, beginningOffset) ->
  8. System.out.println("Beginning offset for topic="+topic.getTopic()+", partition="+
  9. topic.getPartition()+", beginningOffset="+beginningOffset));
  10. }
  11. });
  12. // partition offset 查询辅助方法
  13. consumer.beginningOffsets(topicPartition, done -> {
  14. if(done.succeeded()) {
  15. Long beginningOffset = done.result();
  16. System.out.println("Beginning offset for topic="+topicPartition.getTopic()+", partition="+
  17. topicPartition.getPartition()+", beginningOffset="+beginningOffset);
  18. }
  19. });

与此对应的API还有 endOffsets 方法,用于获取给定分区末端的偏移量值。与 seekToEnd 方法相比,endOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

  1. Set<TopicPartition> topicPartitions = new HashSet<>();
  2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
  3. topicPartitions.add(topicPartition);
  4. consumer.endOffsets(topicPartitions, done -> {
  5. if(done.succeeded()) {
  6. Map<TopicPartition, Long> results = done.result();
  7. results.forEach((topic, endOffset) ->
  8. System.out.println("End offset for topic="+topic.getTopic()+", partition="+
  9. topic.getPartition()+", endOffset="+endOffset));
  10. }
  11. });
  12. consumer.endOffsets(topicPartition, done -> {
  13. if(done.succeeded()) {
  14. Long endOffset = done.result();
  15. System.out.println("End offset for topic="+topicPartition.getTopic()+", partition="+
  16. topicPartition.getPartition()+", endOffset="+endOffset);
  17. }
  18. });

Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。

  1. Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
  2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
  3. // 我们只想要60秒之前的消息的offset
  4. long timestamp = (System.currentTimeMillis() - 60000);
  5. topicPartitionsWithTimestamps.put(topicPartition, timestamp);
  6. consumer.offsetsForTimes(topicPartitionsWithTimestamps, done -> {
  7. if(done.succeeded()) {
  8. Map<TopicPartition, OffsetAndTimestamp> results = done.result();
  9. results.forEach((topic, offset) ->
  10. System.out.println("Offset for topic="+topic.getTopic()+
  11. ", partition="+topic.getPartition()+"\n"+
  12. ", timestamp="+timestamp+", offset="+offset.getOffset()+
  13. ", offsetTimestamp="+offset.getTimestamp()));
  14. }
  15. });
  16. consumer.offsetsForTimes(topicPartition, timestamp, done -> {
  17. if(done.succeeded()) {
  18. OffsetAndTimestamp offsetAndTimestamp = done.result();
  19. System.out.println("Offset for topic="+topicPartition.getTopic()+
  20. ", partition="+topicPartition.getPartition()+"\n"+
  21. ", timestamp="+timestamp+", offset="+offsetAndTimestamp.getOffset()+
  22. ", offsetTimestamp="+offsetAndTimestamp.getTimestamp());
  23. }
  24. });

流量控制

Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause)消息的流入(这里实际上是把消息全部缓存到内存里了);等我们处理了差不多了,可以再继续消费缓存起来的消息(resume)。

我们可以利用 pause 方法和 resume 方法来进行流量控制:

  1. TopicPartition topicPartition = new TopicPartition()
  2. .setTopic("test")
  3. .setPartition(0);
  4. //注册一个handler处理进来的消息
  5. consumer.handler(record -> {
  6. System.out.println("key=" + record.key() + ",value=" + record.value() +
  7. ",partition=" + record.partition() + ",offset=" + record.offset());
  8. // 如果我们读到partition0的第5个offset
  9. if ((record.partition() == 0) && (record.offset() == 5)) {
  10. // 则暂停读取
  11. consumer.pause(topicPartition, ar -> {
  12. if (ar.succeeded()) {
  13. System.out.println("Paused");
  14. // 5秒后再恢复,继续读取
  15. vertx.setTimer(5000, timeId -> {
  16. // resumi read operations
  17. consumer.resume(topicPartition);
  18. });
  19. }
  20. });
  21. }
  22. });

关闭 Consumer

关闭 Consumer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。

由于 close 方法是异步的,你并不知道关闭操作什幺时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

  1. consumer.close(res -> {
  2. if (res.succeeded()) {
  3. System.out.println("Consumer is now closed");
  4. } else {
  5. System.out.println("close failed");
  6. }
  7. });

发送消息到某个 Topic

您可以利用 write 方法来向某个 topic 发送消息(records)。

最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下,消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。

  1. for (int i = 0; i < 5; i++) {
  2. // 这里指定了topic和 message value,以round robin方式发送的目标partition
  3. KafkaProducerRecord<String, String> record =
  4. KafkaProducerRecord.create("test", "message_" + i);
  5. producer.write(record);
  6. }

您可以通过绑定 Handler 来接受发送的结果。这个结果其实就是一些元数据(metadata),包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。

  1. for (int i = 0; i < 5; i++) {
  2. KafkaProducerRecord<String, String> record =
  3. KafkaProducerRecord.create("test", "message_" + i);
  4. producer.write(record, done -> {
  5. if (done.succeeded()) {
  6. RecordMetadata recordMetadata = done.result();
  7. System.out.println("Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
  8. ", partition=" + recordMetadata.getPartition() +
  9. ", offset=" + recordMetadata.getOffset());
  10. }
  11. });
  12. }

如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:

  1. for (int i = 0; i < 10; i++) {
  2. // 这里指定了 partition 为 0
  3. KafkaProducerRecord<String, String> record =
  4. KafkaProducerRecord.create("test", null, "message_" + i, 0);
  5. producer.write(record);
  6. }

因为 Producer 可以使用消息的 key 作为 hash 值来确定 partition,所以我们可以保证所有的消息被发送到同样的 partition 中,并且是有序的。

  1. for (int i = 0; i < 10; i++) {
  2. // i.e. defining different keys for odd and even messages
  3. int key = i % 2;
  4. //这里指明了key,所有的消息将被发送同一个partition.
  5. KafkaProducerRecord<String, String> record =
  6. KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);
  7. producer.write(record);
  8. }

注意:可共享的 Producer 通过 createShared 方法创建。它可以在多个 Verticle 实例之间共享,所以相关的配置必须在创建 Producer 的时候定义。

共享 Producer

有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。您可以通过 KafkaProducer.createShared 方法来创建可以在 Verticle 之间安全共享的 KafkaProducer 实例:

  1. KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);
  2. // 关闭
  3. producer1.close();

返回的 KafkaProducer 实例将复用相关的资源(如线程、连接等)。使用完 KafkaProducer 后,直接调用 close 方法关闭即可,相关的资源会自动释放。

关闭 Producer

与关闭 Consumer 类似,关闭 Producer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。

由于 close 方法是异步的,你并不知道关闭操作什幺时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

  1. producer.close(res -> {
  2. if (res.succeeded()) {
  3. System.out.println("Producer is now closed");
  4. } else {
  5. System.out.println("close failed");
  6. }
  7. });

获取 Topic Partition 的相关信息

您可以通过 partitionsFor 方法获取指定 topic 的分区信息。

  1. producer.partitionsFor("test", ar -> {
  2. if (ar.succeeded()) {
  3. for (PartitionInfo partitionInfo : ar.result()) {
  4. System.out.println(partitionInfo);
  5. }
  6. }
  7. });

错误处理

您可以利用 KafkaProducer#exceptionHandler 方法和 KafkaConsumer#exceptionHandler 方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。比如:

  1. consumer.exceptionHandler(e -> {
  2. System.out.println("Error = " + e.getMessage());
  3. });

随 Verticle 自动关闭

如果您是在 Verticle 内部创建的 Consumer 和 Producer,那幺当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。

使用 Vert.x 自带的序列化与反序列化机制

Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 BufferJsonObjectJsonArray 等类型。

KafkaConsumer 里您可以使用 Buffer

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
  4. config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
  5. config.put("group.id", "my_group");
  6. config.put("auto.offset.reset", "earliest");
  7. config.put("enable.auto.commit", "false");
  8. // 创建一个可以反序列化成jsonObject的consumer.
  9. config = new HashMap<>();
  10. config.put("bootstrap.servers", "localhost:9092");
  11. config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
  12. config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
  13. config.put("group.id", "my_group");
  14. config.put("auto.offset.reset", "earliest");
  15. config.put("enable.auto.commit", "false");
  16. // 创建一个可以反序列化成jsonArray的consumer.
  17. config = new HashMap<>();
  18. config.put("bootstrap.servers", "localhost:9092");
  19. config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
  20. config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
  21. config.put("group.id", "my_group");
  22. config.put("auto.offset.reset", "earliest");
  23. config.put("enable.auto.commit", "false");

同样在 KafkaProducer 中也可以:

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
  4. config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
  5. config.put("acks", "1");
  6. // 创建一个可以序列化成jsonObject的Producer.
  7. config = new HashMap<>();
  8. config.put("bootstrap.servers", "localhost:9092");
  9. config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
  10. config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
  11. config.put("acks", "1");
  12. // 创建一个可以序列化成jsonArray的Producer.
  13. config = new HashMap<>();
  14. config.put("bootstrap.servers", "localhost:9092");
  15. config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
  16. config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
  17. config.put("acks", "1");

您也可以在 create 方法里指明序列化与反序列化相关的类。

比如创建 Consumer 时:

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("group.id", "my_group");
  4. config.put("auto.offset.reset", "earliest");
  5. config.put("enable.auto.commit", "false");
  6. // 创建一个可以反序列化成Buffer的Consumer
  7. KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);
  8. // 创建一个可以反序列化成JsonObject的Consumer
  9. KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);
  10. // 创建一个可以反序列化成JsonArray的Consumer
  11. KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

创建 Producer 时:

  1. Map<String, String> config = new HashMap<>();
  2. config.put("bootstrap.servers", "localhost:9092");
  3. config.put("acks", "1");
  4. // 创建一个可以序列化成Buffer的Producer.
  5. KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);
  6. // 创建一个可以序列化成jsonObject的Producer.
  7. KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);
  8. // 创建一个可以序列化成jsonArray的Producer.
  9. KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

RxJava API

Vert.x Kafka Client 组件也提供Rx风格的API。

译者注:此处也可以参考 Kafka Stream 相关的 API。

  1. Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();
  2. observable
  3. .map(record -> record.value())
  4. .buffer(256)
  5. .map(
  6. list -> list.stream().mapToDouble(n -> n).average()
  7. ).subscribe(val -> {
  8. //获取到一个平均值
  9. });

流实现与 Kafka 原生对象

如果您希望直接操作原生的 Kafka record,您可以使用原生的 Kafka 流式对象,它可以处理原生 Kafka 对象。

KafkaReadStream用于读取 topic partition。它是 ConsumerRecord 对象的可读流对象,读到的是 ConsumerRecord 对象。

KafkaWriteStream用于向某些 topic 中写入数据。它是 ProducerRecord 对象的可写流对象。

API通过这些接口将这些方法展现给用户,其他语言版本也应该类似。


原文档最后更新于 2017-03-15 15:54:14 CET