当前位置: 首页 > 知识库问答 >
问题:

Apache Flink连接与连接

沈永新
2023-03-14

在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

共有1个答案

常睿范
2023-03-14

差异是相当显著的。在连接的情况下,它的工作原理或多或少类似于SQL内部连接,因此您需要提供将用于连接的字段,并且连接通过窗口进行计算。

因此,基本上,您定义了将用于联接的每个窗口的键以及将用于评估结果的窗口。ProcessJoinFunction 允许您在处理连接元素后对其进行处理,但您无法控制连接机制本身,即已连接元素对将被传递到 ProcessJoinFunction

在< code>connect的情况下,您还可以定义键(虽然您不必这样做),但是这些键将不会用于连接,而是用于控制通过操作符的并行实例的流和键控状态。因此,在connect的情况下,没有逻辑负责如何连接元素,而是为来自stream1的每个元素调用< code>processElement1方法,为来自stream2的每个元素调用< code>processElement2方法。所以,如果你想执行某种连接,你必须自己实现逻辑。

 类似资料:
  • 有人能告诉我在maven中scm连接和developerConnection之间的区别吗? 我正在尝试使用,它需要其中之一。 [错误]未能执行goal org . Apache . maven . plugins:maven-release-plugin:2 . 3 . 2:在项目was-topology-legacy-dsl上准备(default-cli ):缺少必需的设置:必须指定scm连接或

  • 在我的程序中,我正在访问wep api。最多可以有7个不同的线程访问web api的不同服务器。每个线程负责一个服务器,每个服务器速率限制每个线程。每个线程更新相同的mysql数据库。线程数保持不变。 在我的示例中,是否需要连接池?我不应该只打开7个不同的连接,这些连接将在程序的生命周期中打开吗?

  • 我应该在哪里打开和关闭到存储库的连接?在文章“不要重复DAO”中写道: DAO不负责处理事务、会话或连接。这些都是在DAO之外处理的,以实现灵活性。 但有些人建议我将对象注入DAO类,并处理DAO方法内部的所有连接。。。i、 e.每个CRUD操作都应打开和关闭与存储库的连接。

  • 我正在尝试使用pymongo连接到MAC中的mongo。我得到以下错误- 回溯(最近一次调用):文件“”,第1行,从pymongo导入MongoClient导入错误:无法导入名称“MongoClient” 我也试过连接。但它给出了同样的错误。有什么帮助吗?

  • 我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不

  • 问题内容: 我在JBoss服务器上使用Hibernate。我得到下面的错误。 当我尝试在同一会话中第二次连接到数据库时,发生错误。 另外,我还会收到错误消息“为您关闭连接。请关闭您的连接”。 可能是什么原因,如何解决这种情况? 问题答案: 自从您一年前提出这个问题以来,这个答案实际上可能为时已晚。但它将帮助将来会遇到此错误的人。 您的错误可能来自不同的来源,但在我的情况下,它的所有原因均与事务超时