当前位置: 首页 > 知识库问答 >
问题:

Flink interval用KafkaSource连接数据流会删除所有记录

何嘉运
2023-03-14

当前配置

  • 应用程序在Flink 1.14.4上运行
  • 应用程序中,数据流(一长串运算符的结果)与Kafka源连接
  • 每个分区的事件时间戳严格递增
  • Kafka上的每分区水印策略将水印设置为迄今为止看到的最大时间戳(无序度为1秒)
  • 默认情况下,Kafka源(间隔连接的右侧)比其他数据流(Kafka)提前一分钟左右

问题

  • 直到最近,我用新的KafkaSource类替换了不推荐使用的KafkCasumer类,这种设置一直运行良好
  • 间隔连接运算符的输出应该是(和曾经是)两个传入流(Kafka和DS)中的最小值,但对于KafkaSource,它在一定时间后被设置为最大值。因此,数据流中的所有记录都会被丢弃,因为其时间戳位于水印(=Kafka源)之后

问题

  • 当流速度不同步时,KafkaSource在窗口连接上的行为是否有所不同
  • 窗口连接运算符的水印可以是两个传入流中的最大值吗

共有1个答案

莘昊
2023-03-14

你的两个问题的答案是“不”和“不”。

与旧的FlinkKafkaConsumer相比,新的KafkaSource确实具有不同的水印行为,至少在一种情况下:当有空分区时。旧源在启动时检测空分区并自动将其标记为空闲,而新源仅在与 withIdleness 选项一起使用时检测空闲分区。

如果您有任何空的分区,这将解释为什么连接没有产生任何结果。

 类似资料:
  • DELETE命令用于从Cassandra表中删除数据。 您可以使用此命令删除完整的表或选定的行。 语法: 下面举个例子来演示如何从Cassandra表中删除数据。 我们有一个名为“”的表其中列(,, ),这个表中具有以下数据。 删除整行 要删除为的整行记录,请使用以下命令: 在执行上面语句之后,为 的行记录已被删除。 您可以使用SELECT命令验证它。 删除一个特定的列名 示例: 删除为的记录中的

  • 我指的是这个例子。 它使用jsPlumb版本2.4.3,我想删除所有连接,但它不起作用,要删除所有连接,我正在使用这段代码。 要删除单个连接,我使用下面的代码,但它显示。分离不是一个函数。 jsplumb是否存在任何版本问题。 任何帮助将高度赞赏。谢啦

  • 问题内容: 故事 我将编写一些代码来管理应用程序中已删除的项目,但是我将对其进行软删除,以便在需要时可以将其返回。在隐藏或删除项目时,我有一个层次结构要尊重我的应用程序的逻辑。 我顺理成章地将我的物品放在三个容器中,分别放入国家,城市,地区和品牌。每个项目都应该属于一个国家,城市,地区和品牌。现在,如果我删除了一个国家,则应该删除属于给定国家的城市,地区,品牌和商品。如果我删除了该城市,它还应该删

  • 问题内容: 对于如何迅速删除所有核心数据,我有些困惑。我创建了一个带有链接的按钮。单击按钮后,我将看到以下内容: 然后,我想出了各种方法来尝试删除所有核心数据内容,但似乎无法正常工作。我已经使用removeAll从存储的数组中删除,但是仍然不能从核心数据中删除。我假设我需要某种类型的for循环,但是不确定如何从请求中进行。 我尝试应用删除单行的基本原理 但是,这样做的问题是,当我单击按钮时,我没有

  • 问题内容: 我有一对多关系的两个数据库表。数据如下所示: 结果集: 我想删除所有应用程序,但最新的除外。换句话说,每个学生都只能将一个应用程序链接到该应用程序。使用上面的示例,数据应如下所示: 我将如何构造我的DELETE语句以过滤出正确的记录? 问题答案: 考虑到评论中的长时间讨论,请注意以下几点: 上面的语句 将 在任何正确实现语句级读取一致性的数据库上运行,而不管语句运行时对表的任何更改。

  • 问题内容: 我有一个这样的表: 我正在尝试删除具有3个以上具有相同ID的名称的记录,但要删除所有记录。因此,我试图得到这样的东西: 我不了解如何编写此查询。我已经达到了保留一个记录但没有记录阈值的程度: 给我: 有什么建议?哦,对了,我不在乎合并时会保留哪些记录。 问题答案: 您可以使用CTE做到这一点