当前位置: 首页 > 知识库问答 >
问题:

Flink中KeyedBroadcastProcessFunction的键控状态是如何管理的?

谷梁宏恺
2023-03-14

我正在使用broadcaststate在Flink中执行流计算。我为我的作业定义了一个扩展KeyedBroadcastProcessFunction的类。假设我有一个通过(user_id,location)键控的流a,和一个流B,它被广播给所有执行程序,以使用我定义的类处理a中的元素。我知道我可以在这个类的ProcessBroadcastElementProcessElement中注册一个计时器,这样当它超时时,我可以通过调用state.clear()来删除特定密钥组的关联状态。之后我在想,这个重点群体还存在吗?

例如,在流A中,一个新消息带有(user_id=1,location='usa'),我们生成了这样的密钥组及其关联状态。之后,如果出现另一个带有(user_id=1,location='usa')的消息,它将触发processElement()并发出结果。

假设24小时后,我对这个密钥组不再感兴趣(user_id=1,location='usa'),我可以注册一个计时器来清除关联状态,但我对这个密钥组没有控制权。结果,24小时后,当另一个带有(user_id=1,location='usa')的消息来临时,由于这个密钥组仍然存在,因此仍然会调用processElement()。当作业运行时,尽管它们的关联状态将在24小时后被清除,但关键组是否会累积,或者这不应该成为内存使用的一个问题?

相关博客:https://www.da-platform.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

共有1个答案

蒋芷阳
2023-03-14

Flink的键状态被组织成一个分布式(或分片)键值存储,其中的键可以是简单的东西,如整数和字符串,也可以是组合物,如(user_id=1,location='usa')。键组与复合键不同。密钥组是在Flink1.2(参见FLINK-3755)中引入的运行时构造,以允许对密钥值状态进行有效的重新缩放。密钥组是密钥空间的子集,并作为一个独立的单元进行检查点。在运行时,同一键组中的所有键在作业图中一起分区--每个子任务具有一个或多个完整键组的键值状态。这份设计文件给出了更多的细节。作为使用DataStream API的用户,关键组是一个实现细节,而不是您直接使用的东西。

对于keyedBroadcastProcessFunction中的计时器,可以在ProcessElementOnTimer方法中注册,但不能在ProcessBroadcastElement方法中注册。这是因为计时器总是与一个键相关联,而没有与广播元素相关联的键。但是,您可以在KeyedBroadcastProcessFunction.Context对象上使用ApplyTokeyedState方法,在ProcessBroadcastElement方法期间操作任何或所有键控状态。更多详情请参阅文档。

调用state.clear()后,该键的状态条目将被删除。当然,该键的新流事件可能会在状态被清除后到达,如果您愿意,您可以再次存储该键的值状态。为了避免由于为不再相关的键保持状态而导致的无限内存使用,您确实需要小心。您可能希望使用类似这样的逻辑,在每次创建状态后24小时使状态过期:

processElement:
  if state.value() is null, register timer
  state.update(...)

onTimer:
  state.clear()

或者您可能需要更复杂的逻辑,以便在更新或访问状态时延长状态的生存期。

另一个选择是使用状态生存时间特性。

更新:

    null
 类似资料:
  • Apache Flink-“keyby”中的异常处理 根据第一个链接,用户说他在processfn中使用sideoutput来捕获错误,我也在我的程序中使用sideoutput来发送与模式不匹配的数据,但是我不知道如何处理错误和无效数据到相同的sideoutput 根据第二个链接,用户正在尝试添加一个sink到keyby函数和null key和printsink函数,这是我完全不理解的 1)任何关

  • 我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?

  • Flink以何种格式保存运算符的托管状态(用于检查点或逻辑运算符之间的通信(即沿着作业图的边缘)? 文档内容如下 背景:我正在考虑从JSON切换到使用AVRO来将数据输入到我的源中,并将数据发送到我的接收器中。但是,由Avro创建的自动生成的POJO类相当嘈杂。因此,在作业图(用于Flink操作符之间的通信)中,我正在考虑使用像Avro这样的二进制序列化格式是否有任何性能优势。这可能对性能没有实质

  • 我在同一份flink jobs中读了两个Kafka主题。 :来自第一个主题的消息被保存到rocksdb,然后它将与Stream2联合。 :来自第二个主题的消息被Stream1保存的状态所丰富,然后它将与Stream1联合。 主题1和主题2是不同的来源,但两个来源的输出基本相同。我必须用topic1的数据来充实topic2的数据。 这里是流动; 这里是问题; 那个流量好吗? 可以访问由保存的相同的状

  • 我是刚接触flink的,我正在尝试编写junit测试用例来测试KeyedBroadcastProcessFunction。下面是我的代码,我当前正在调用TestUtils类中的getDataStreamOutput方法,并在输入数据根据模式规则列表求值后将inputdata和patternrules传递给方法,如果输入数据满足条件,我将获得信号并调用sink函数,并在getDataStreamOu

  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)