当前位置: 首页 > 知识库问答 >
问题:

Flink如何使用具有多个分区的Kafka主题中的消息,而不会产生扭曲?

臧烨烁
2023-03-14

假设一个主题有3个kafka分区,我希望我的事件按小时窗口,使用事件时间。

当某个分区位于当前窗口之外时,kafka使用者是否会停止读取该分区?还是打开一个新窗口?如果它正在打开新的窗口,那么,如果一个分区的事件时间与其他分区相比会非常倾斜,那么从理论上讲,它不可能打开无限数量的窗口,从而耗尽内存吗?当我们重播一些历史时,这种情况尤其可能发生。

我一直试图从阅读留档中得到这个答案,但是在分区上找不到太多关于Flink和Kafka的内部结构。非常欢迎关于这个特定主题的一些好的留档。

谢谢

共有2个答案

陶星辰
2023-03-14

你可以尝试使用这种风格

public void runStartFromLatestOffsets() throws Exception {
        // 50 records written to each of 3 partitions before launching a latest-starting consuming job
        final int parallelism = 3;
        final int recordsInEachPartition = 50;

        // each partition will be written an extra 200 records
        final int extraRecordsInEachPartition = 200;

        // all already existing data in the topic, before the consuming topology has started, should be ignored
        final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);

        // the committed offsets should be ignored
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
        kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
西门飞翮
2023-03-14

因此,首先,Kafka的所有事件都是不断读取的,进一步的窗口操作对此没有影响。在谈到内存不足时,需要考虑更多的事情。

  • 通常您不会为窗口存储每个事件,而只是为事件存储一些聚合
  • 每当窗口关闭时,相应的内存就会被释放。

更多关于Kafka消费者如何与EventTime交互的信息(特别是水印,您可以在此处查看

 类似资料:
  • 我想要任何关于Kafka如何维护消息序列的信息/解释,当消息被写入多个分区的主题时。例如,我有多个消息生成器,每个消息生成器按顺序生成消息,并用超过1个分区编写Kafka主题。在这种情况下,消费者组将如何工作来消费消息。

  • 我读了很多文章,但没有找到如何使用Spring Integration Kafka配置具有多分区主题(在运行时创建主题)的Producer。 我正在使用github链接来理解并为我的应用程序配置kafka。 请提供解决方案 还有一点,KafKaheader.MessageKey的用途是什么。 我得到空指针异常。下面是提及日志: 谢谢

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那

  • 我有一个带有4个分区的Kafka主题,因为我有一个带有4个消费者的消费者组。 我的目的是确保消息在分区之间均匀分布。 有没有办法验证kafka主题的跨分区消息分布?