API

优质
小牛编辑
130浏览
2023-12-01

2. API

Apache Kafka包含了新的Java客户端(在org.apache.kafka.clients package包)。它的目的是取代原来的Scala客户端,但是为了兼容它们将并存一段时间。老的Scala客户端还打包在服务器中,这些客户端在不同的jar保证并包含着最小的依赖。

producerapi

我们鼓励所有新的开发都使用新的Java生产者。这个客户端经过了生产环境测试并且通常情况它比原来Scals客户端更加快速、功能更加齐全。你可以通过添加以下示例的Maven坐标到客户端依赖中来使用这个新的客户端(你可以修改版本号来使用新的发布版本):

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.0.0</version>
  5. </dependency>

生产者的使用演示可以在这里找到javadocs

对老的Scala生产者API感兴趣的人,可以在这里找到相关信息。

consumerapi

在0.9.0发布时我们添加了一个新的Java消费者来取代原来的上层的(high-level)基于ZooKeeper的消费者和底层的(low-level)消费者API。这个客户端被认为是测试质量(beta quality)。为了保证用户能平滑的升级,我们还会维护老的0.8的消费者客户端能与0.9的集群协作。在下面的章节中我们将分别介绍老的0.8消费者API(包括上层消费者连机器和底层简单消费者)和新的Java消费者API。

highlevelconsumerapi

  1. class Consumer {
  2. /**
  3. * Create a ConsumerConnector
  4. * 创建一个ConsumerConnector
  5. *
  6. * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
  7. * connection string zookeeper.connect.
  8. * 配置参数最少要设置此消费者的groupid和Zookeeper的连接字符串Zookeeper.connect
  9. */
  10. public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
  11. }
  12. /**
  13. * V: type of the message 消息的类型
  14. * K: type of the optional key associated with the message 消息可选的key的类型
  15. */
  16. public interface kafka.javaapi.consumer.ConsumerConnector {
  17. /**
  18. * Create a list of message streams of type T for each topic.
  19. * 为每个topic创建一个T类型的消息流
  20. *
  21. * @param topicCountMap a map of (topic, #streams) pair
  22. * (topic, #streams)对的Map
  23. * @param decoder a decoder that converts from Message to T
  24. * 将消息转换为T类型的解码器
  25. * @return a map of (topic, list of KafkaStream) pairs.
  26. * The number of items in the list is #streams. Each stream supports
  27. * an iterator over message/metadata pairs.
  28. * 返回一个(topic, KafkaStream列表)对的Map。list的元素个数为#streams。每个stream都支持一个对message/metadata对的迭代器。
  29. */
  30. public <K,V> Map<String, List<KafkaStream<K,V>>>
  31. createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
  32. /**
  33. * Create a list of message streams of type T for each topic, using the default decoder.
  34. * 使用默认的解码器为每个topic创建一个T类型的消息流
  35. */
  36. public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
  37. /**
  38. * Create a list of message streams for topics matching a wildcard.
  39. * 为符合通配符的topics创建一个消息流列表
  40. *
  41. * @param topicFilter a TopicFilter that specifies which topics to
  42. * subscribe to (encapsulates a whitelist or a blacklist).
  43. * 指明哪些topic被订阅的topic过滤器(封装一个白名单或者黑名单)
  44. * @param numStreams the number of message streams to return.
  45. * 将返回的消息流的数量
  46. * @param keyDecoder a decoder that decodes the message key
  47. * 用于解码消息键的解码器
  48. * @param valueDecoder a decoder that decodes the message itself
  49. * 解码消息的解码器
  50. * @return a list of KafkaStream. Each stream supports an
  51. * iterator over its MessageAndMetadata elements.
  52. * KafkaStream的列表。每个流支持一个遍历消息及元数据元素的迭代器
  53. */
  54. public <K,V> List<KafkaStream<K,V>>
  55. createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
  56. /**
  57. * Create a list of message streams for topics matching a wildcard, using the default decoder.
  58. * 使用默认的解码器为符合通配符的topic创建消息流列表
  59. */
  60. public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
  61. /**
  62. * Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
  63. * 使用一个流和默认的解码器为符合通配符的topic创建消息流列表
  64. */
  65. public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
  66. /**
  67. * Commit the offsets of all topic/partitions connected by this connector.
  68. * 提交连接到这个连接器的所有topic/partition的偏移量
  69. */
  70. public void commitOffsets();
  71. /**
  72. * Shut down the connector
  73. * 关闭这个连接器
  74. */
  75. public void shutdown();
  76. }

你可以参见这个示例来学习如何使用高层消费者api。

simpleconsumerapi

  1. class kafka.javaapi.consumer.SimpleConsumer {
  2. /**
  3. * Fetch a set of messages from a topic.
  4. * 从一个topic上拉取抓取一堆消息
  5. *
  6. * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
  7. * request指定topic名称,topic分区,起始的比特偏移量,最大的抓取的比特量
  8. * @return a set of fetched messages
  9. * 抓取的消息集合
  10. */
  11. public FetchResponse fetch(kafka.javaapi.FetchRequest request);
  12. /**
  13. * Fetch metadata for a sequence of topics.
  14. * 抓取一个topic序列的元数据
  15. *
  16. * @param request specifies the versionId, clientId, sequence of topics.
  17. * request指明versionId,clientId,topic序列
  18. * @return metadata for each topic in the request.
  19. * request中的每个topic的元数据
  20. */
  21. public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
  22. /**
  23. * Get a list of valid offsets (up to maxSize) before the given time.
  24. * 获取一个在指定时间前有效偏移量(到最大数值)的列表
  25. *
  26. * @param request a [[kafka.javaapi.OffsetRequest]] object.
  27. * 一个[[kafka.javaapi.OffsetRequest]]对象
  28. * @return a [[kafka.javaapi.OffsetResponse]] object.
  29. * 一个[[kafka.javaapi.OffsetResponse]]对象
  30. */
  31. public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
  32. /**
  33. * Close the SimpleConsumer.
  34. * 关闭SimpleConsumer
  35. */
  36. public void close();
  37. }

对于大多数应用,高层的消费者Api已经足够优秀了。一些应用需求的特性还没有暴露给高层消费者(比如在重启消费者时设置初始的offset)。它们可以取代我们的底层SimpleConsumer Api。这个逻辑可能更复杂一点,你可以参照这个示例

newconsumerapi

这个新的统一的消费者API移除了从0.8开始而来的上层和底层消费者API的差异。你可以通过添加如下示例Maven坐标来添加客户端jar依赖来使用此客户端。

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.0.0</version>
  5. </dependency>

关于消费者如何使用的示例在javadocs

streamsapi

我们在0.10.0的发布中添加了一个新的称为Kafka Streams客户端类库来支持用户实现存储于Kafka Topic的数据的流处理程序。Kafka流处理被认定为alpha阶段,它的公开API可能在后续的版本中变更。你可以通过添加如下的Maven坐标来添加流处理jar依赖,从而使用Kafka流处理(你可以改变版本为新的发布版本):

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-streams</artifactId>
  4. <version>0.10.0.0</version>
  5. </dependency>

如何使用这个类库的示例在javadocs给出(注意,被注解了@InterfaceStability.Unstable的类标明他们的公开API可能在以后的发布中变更并不保证前向兼容)。