当前位置: 首页 > 知识库问答 >
问题:

Flink kafka消费者落后

赫连棋
2023-03-14

我在Flink的工作中使用Kafka资料来源的信息流,一次阅读50个主题,如下所示:

FlinkKafkaConsumer<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer<GenericRecord>(
            Pattern.compile("TOPIC_NAME[1-50].stream"), // getting data stream from all topics
            <DeserializationSchema>, //using avro schema
            properties); // auto.commit.interval.ms=1000 ...

然后有一些运算符,如:过滤器-

我能获得的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这相当低,我可以清楚地看到消费者落后于生产者。我甚至试着移除水槽和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到Amazon Kinesis data analytics,并尝试了几种并行设置,但这些设置似乎都无法提高吞吐量。

我遗漏了什么吗?

共有1个答案

贝杜吟
2023-03-14

有几件事会显着影响吞吐量。

无效的序列化通常是导致吞吐量低的一个主要因素。请参阅Flink Serialization Tuning Vol.1:选择您的序列化程序-如果可以的话,以获取有关此主题的详细信息。Avro通用记录序列化程序还不错,但您是否携带了实际不需要的数据?

您是否正在更改管道中的任何位置的并行性?那很昂贵。

对于Kinesis数据分析,您必须使用RocksDB状态后端,它的吞吐量远远低于基于堆的状态后端。但拥有正确的配置会有很大帮助。您应该为RocksDB工作目录使用可用的最快本地磁盘(SSD,或者在极端情况下,可能需要RAM磁盘)。确保实例类型提供足够的IOPs。给RocksDB足够的内存。如果您进行大量读取,那么Bloom过滤器值得启用。请参阅Flink中磁盘对RocksDB状态后端的影响:案例研究,以了解有关使用RocksDB的更多信息。

您可以尝试禁用检查点作为实验。如果这有帮助,这将提供一些线索。

某些网络设置会影响吞吐量。默认值通常提供了良好的性能,但如果您对其进行了修改,则值得研究。

 类似资料:
  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我正在Java中实现一个简单的Kafka消费者。代码如下: 我在网上查看的任何文档都给出了range或roundrobin作为可能的分配策略,据我所知,groupId是一个自定义名称。不确定这里什么是正确的配置值。