当前位置: 首页 > 知识库问答 >
问题:

连接键控流上的coProcessFunction与keyedCoProcessFunction

嵇财
2023-03-14

在引用Flink代码示例时,我观察到当使用connect运算符连接两个流时,coProcessFunctionkeyedcoProcessFunction在对键控流操作时几乎可以互换和相同地扩展(覆盖processElement1processElement2ontimer),那么在对键控流操作时,与对keyedcoProcessFunction进行扩展以实现键控连接流的业务逻辑相比,扩展

谢谢你。

共有1个答案

商正诚
2023-03-14

KeyedCoProcessFunctionKeyedProcessFunction是最近添加的。与非键式风格相比的不同之处在于,当前键在context中可用,它被传递给各种processElement和onTimer方法。

如果您尝试在ProcessFunction或CoProcessFunction中使用键控状态或计时器,那么如果您实际处于键控上下文中,它将起作用,如果不是,它将抛出异常。

 类似资料:
  • Flink社区! 我有一个关于在Flink中连接相同键上的多个流的问题(等连接)。我还是一个新手,正在为我的团队评估Flink,将我们的Spark批处理应用程序迁移到流处理。 注意:我看了FabianHüske的这篇关于加入处理的文章:窥视Apache Flink的引擎室。 为了简化问题,假设您有3个流,每个流都有唯一的记录,可以通过id字段进行键控。对于流中的每条记录,您将在其他流中找到相应的记

  • 问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 问题内容: 我观察到有两种方法可以在多个表上实现目标。结果集中的一列将被更新,并且可能需要速度。结果集可以通过以下方式获得: 情况1: 或者 情况2: 两者给出的结果相同,只是连接条件有所不同。哪个运行/执行速度更快? eval_id为,report_type和course_name为。 对于所使用的开发人员,情况1具有以下统计信息:[SELECT-3077行,0.048秒]提取了结果集…执行了1

  • 我有4个实体列在下面的职位。客户端和产品已在数据库中。问题在于订单和订单产品。我创建Orders对象,然后添加客户机(已经在数据库中)和OrdersProducts的ArrayList,但出现了错误: 当我不添加客户机时,数据库中只有OrderProducts的ArrayList,但OrderProducts没有外键。怎么了?我使用Mysql服务器和hibernate。