当前位置: 首页 > 知识库问答 >
问题:

keyBy 是否在 Flink (scala) 中的并行任务之间对 DataStream 进行分区?

许振海
2023-03-14

我想在 Flink 中的输入数据流上应用 ProcessFunction(),以使用单个缓存对象处理每个传入元素。我的代码看起来像这样:

object myJob extends FlinkJob {
 private val myCache = InMemoryCache()

 private def updateCache(myCache,someValue) : Boolean = {//some code}

 private def getValue(myCache,someKey) : Boolean = {//some code}

 def run(params, executionEnv) : Unit = {
   val myStream = executionEnv.getStream()

   val processedStream = myStream.process(new ProcessFunction {
     def processElement(value,context,collector) : Unit = {
      //Update cache
      //Collect updated event
     }
   }

   processedStream.write()
 }
}

当我并行化此作业时,我假设作业的每个并行实例都有自己的缓存对象,因此,单个缓存键可能存在于多个缓存对象中。但是,我希望特定键有一个缓存条目,也就是说,对应于特定键的所有记录必须由单个实例和单个缓存对象处理。在 myStream 上使用 keyBy() 是否可确保所有具有相同键的传入事件都由 Flink 作业的单个并行任务/实例处理,因此也由单个缓存对象处理?

共有2个答案

堵凯
2023-03-14

我认为你应该用状态来代替对象。

具有相同键的所有事件都将访问相同的状态,因此访问相同的值。修改其中一个状态不会影响其他键状态。

葛学民
2023-03-14

是的,keyBy 保证具有相同键的每个事件都将由同一运算符实例处理。这对于高吞吐量、低延迟的有状态流处理至关重要。

这使得 flink 的状态是本地的,这使得它易于使用且速度更快。计时器还利用此键控分区。

使用 Flink 的键控状态可能比使用缓存对象要好得多。

 类似资料:
  • 我们使用带有水印的周期事件时间窗口。我们目前在Flink应用程序中有4个并行任务。 在流式处理过程中,所有4个任务的水印值都必须接近触发窗口事件。 例如 任务1水印值=8 任务2水印值=1 任务3水印值=8 任务4水印值=8 任务2正在等待日志更新其水印。但是,这种情况可能发生在任务2更新之前,我们希望在更新之前触发窗口事件。 是否有任何机制可以在不等待其他任务的情况下对齐所有并行任务的水印或触发

  • 我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。 我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。

  • 我是Flink的新人。我正在编写一个简单的Flink POC程序,在这里我能够获得预期的输出。但我无法获得钥匙和车窗操作的内部信息。以下是我的代码, 在部署Flink作业时,我在Flink UI中看到以下图表, 任务展示台 从上图中,我完全理解了它使用了2个任务和4个插槽,每个任务有2个并行性。第一个任务有源,pojo映射器第二个任务有sum函数,sink函数。 现在的问题是, > KeyBy和W

  • 问题内容: 我有一个表,其中包含开始时间(在示例中使用数字以使其保持简单)以及事件的持续时间。 我想确定“块”及其开始时间和结束时间。 每当前一行的结束时间(开始时间+持续时间)(按开始时间排序)与当前行的开始时间之间的差值为时,应开始一个新的“块”。 这是我的测试数据,包括在注释中尝试进行图形解释的尝试: 第一个块开始于,结束于。由于与下一行的区别是,开始另一个块,终止于。 我可以使用来识别块的

  • 我想像这样在中使用一个不可序列化的对象 它非常低效,因为我创建了许多实例。实际上,它只能在每个工作器中创建一次。 在Spark中,我可以使用mapPartition来执行此操作。但在flink stream api中,我不知道。