Flink kafka消费者有两种类型的消费者,例如:
这两个消费者层次结构扩展了相同的FlinkKafkaConsumerBase
类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种?
我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/html" target="_blank">streaming/connectors/kafka/FlinkKafkaConsumer.java#L72
编辑:(以下链接已停止工作,因为此文件已在主分支中删除)。https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java#L77
版本化的Kafka消费者(和生产者)是根据Kafka客户端的这些版本构建的,并且每个版本都将与Kafka的特定版本一起使用。未版本的连接器——FlinkKafkaConsumer和FlinkkafKapProducer——是使用通用客户端库构建的,并且自0.10以来与Kafka的所有版本兼容。
请注意,Flink 1.11中删除了Kafka0.8和0.9版本的消费者和生产者,而Flink 1.12中删除了0.10和0.11版本(https://issues.apache.org/jira/browse/FLINK-19152).
编辑:
在某些情况下,仅仅允许来自水槽的背压来节流源就足够了。但在其他情况下(例如,多个来源),可能工作得不够好。
你会发现一些关于这个的讨论,和代码做你自己的速率限制,在http://mail-archives.apache.org/mod_mbox/flink-user/202007.mbox/
大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(
我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我正在Java中实现一个简单的Kafka消费者。代码如下: 我在网上查看的任何文档都给出了range或roundrobin作为可能的分配策略,据我所知,groupId是一个自定义名称。不确定这里什么是正确的配置值。