当前位置: 首页 > 知识库问答 >
问题:

Apache Flink 中的 MapState

锺离浩慨
2023-03-14

我想在 MapState 中为 KeyedStream 中的每个密钥存储大约 500 个密钥。我不知道MapState是如何工作的。

当我访问MapState时,我必须读取所有密钥,然后更新其中一个。MapState在这里有效工作吗?

共有1个答案

鲜于致远
2023-03-14

当与 HashMapStateBackend 一起使用时,MapState 是一个内存中的哈希映射(在多版本、并发控制的哈希映射中)。当与EmbeddedRocksDBStateBackend一起使用时,MapState中的每个键/值对都是本地RocksDB实例中的单独键/值对。

请注意,MapState 有一个返回所有键的键方法,以及一个迭代器方法,用于迭代所有/值对。但是,无论您选择哪种状态后端,这两种操作都是昂贵的。

 类似资料:
  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 我注意到,每次我运行一个新作业时,它所花费的时间比我再次启动它时长20%左右? 如果一个作业运行多次,flink是否缓存一些结果并重用它们?如果是,我如何控制这一点? 我想测量我的任务运行了多长时间,但每次我重新运行它们时,速度都比以前快。

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的