当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:接收器是否存储在检查点进入检查点状态期间从流缓冲的项目?

谢典
2023-03-14

我正在开发一个遗留的Flink管道,我们希望更改我们正在使用的接收器的实现。我们正在运行Flink 1.10,试图从BucketingSink过渡到StreamingFileSink,两者都在将ORC写入相同的目标。我们的管道非常简单:我们将一些kakfa流合并到单个接收器中(没有其他操作员)。

部署测试期间,我注意到当我们从sink1切换到sink2(反之亦然)时,我们最终会在我们正在编写的文件中丢失kafka消息(通过hive/trino查询)。丢失消息的kafka时间戳与我的部署一致,所以我相信这不是一些无关的上游问题。

我目前的理论是,在检查点期间,接收器在等待所有检查点屏障时缓冲来自流的消息,这些缓冲的事件被捕获为该接收器的检查点状态的一部分,kafka消息来源认为这些偏移已被交付/处理(即使它们没有被写入文件,而只存在于接收器的缓冲区中)。因此,当我使用不同的接收器进行部署,并从使用旧接收器创建的检查点开始时,这些缓冲的消息会丢失。我正在寻找确认这些接收器是否将缓冲事件写入检查点状态,并将导致kafka源将它们视为“已处理”,即使它们尚未写入文件。

我们的时间线看起来像:

┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐  ┌────────┐  ┌──────┐
│Sink1 │  │Sink1 │  │Sink1 │  │Deploy│  │ Resume │  │Sink2 │
│ CP1  │─▶│ CP2  │─▶│ CP3  │─▶│Sink2 │─▶│from CP3│─▶│ CP4  │
└──────┘  └──────┘  └──────┘  └──────┘  └────────┘  └──────┘

最后,在写入ORC文件的Kafka消息中,“Sink1 CP3”和“Sink2 CP4”之间出现了一个缺口。所以我相信Kafka源中的Kafka偏移量有所提高(尽管我们的源没有任何变化),所以Kafka源认为我们已经处理了这些缓冲消息,在我们从CP3恢复后不会将它们发送到Sink2。陌生人:如果我返回到Sink1并从CP4恢复,那么在CP3和CP4之间丢失的事件就会被写入!并且不会写入重复的事件,因此不会将Kafka源倒回到旧的偏移量,也不会在CP3之后重新处理所有消息。

那么,我的思路是否正确,Kafka源是否为缓冲消息提前了偏移量?有没有一种方法可以安全地从一个接收器过渡到另一个接收器,而不会丢失这些Kafka信息的狭窄片段?

共有1个答案

吕修伟
2023-03-14

是的,在checkpoint flink使用状态存储后端期间,请检查保存点功能以避免丢失数据https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

什么是保存点?保存点与检查点有何不同保存点是通过Flink的检查点机制创建的流作业执行状态的一致映像。您可以使用保存点停止和恢复、分叉或更新Flink作业。保存点由两部分组成:一个在稳定存储中包含(通常较大)二进制文件的目录(例如HDFS、S3等)和一个(相对较小)元数据文件。稳定存储上的文件表示作业执行状态映像的净数据。保存点的元数据文件以相对路径的形式包含(主要)指向稳定存储中作为保存点一部分的所有文件的指针。

 类似资料:
  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)

  • 我从我的网络商店API调用中获取XML,其结构非常像下面的示例: 其中有更多的参数,数量取决于许多外部因素。我正在尝试获取用户名,当id“111”和id“112”的值是我要查找的值时 上述代码按预期返回“userOne”和“userTwo”。 问题是,Id“111”要么有值“Param 1 is on”要么什么都没有,如果没有值,它就不会显示在XML中。所以我需要一个表达式来检查id为“111”的

  • 我有一个用lat/long表达的观点 我有一个以米表示的半径值。我需要检查另一个点,也用拉特/long表示,是否在圆内。 如果我在平面上,我可以简单地使用公式 正如这些答案所深刻解释的那样。 然而,根据纬度/经度的用法,我不能使用这个公式,因为行星是球形的。 如何计算从任何给定点到中心的距离以与半径进行比较?

  • 我想在flink中测试一次端到端的处理。我的工作是: Kafka资料来源- 我在mapper1中放了一个< code > thread . sleep(100000),然后运行了这个作业。我在停止作业时获取了保存点,然后从mapper1中删除了< code > thread . sleep(100000),我希望该事件应该会被重放,因为它没有下沉。但这并没有发生,乔布斯正在等待新的事件。 我的Ka

  • 我试图检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意: 实例(主节点和核心节点)正确设置了IAM角色,以访问s3 bucket及其内部的所有目录/文件(AmazonS3FullAccess策略附加到该角色,没有任何内容覆盖它) jobmanager日志:

  • 如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定