当前位置: 首页 > 知识库问答 >
问题:

Flink stucks在创建检查点时

瞿和硕
2023-03-14

我有一份很轻松的工作,在创建检查站方面很吃力。它几乎没有州(除了一些Kafka偏移)。

工作本身有以下基本设置:

Kafka索资源-

迭代函数再次执行HTTP调用并转发成功的消息,丢弃4xx并重试5xx。从我的指标中可以看到,所有这些都发生了,我得到了一些5xx(返回迭代源)、一些4xx(忽略)和很多2xx(转发到HDFS)。

如果我查看线程转储,我可以看到某个任务被阻止了:

"Async calls on IterationSource-8 (1/1)" #123 daemon prio=5 os_prio=0 tid=0x00007f174000f800 nid=0x237 waiting for monitor entry [0x00007f17b32f5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:747)
    - waiting to lock <0x00000000ace0f128> (a java.lang.Object)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:683)
    at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1155)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这一个正在等待对象监视器,该监视器由以下对象保持:

"IterationSource-8 (1/1)" #63 prio=5 os_prio=0 tid=0x00007f17c00bf000 nid=0x1e0 in Object.wait() [0x00007f17b17d2000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:256)
    - locked <0x00000000acd030b0> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
    at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:184)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.performDefaultAction(StreamIterationHead.java:77)
    - locked <0x00000000ace0f128> (a java.lang.Object)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

仔细查看源代码,我可以看到第二个线程(持有锁)似乎处于某种无休止的循环中:

LocalBufferPool。java:

while (availableMemorySegments.isEmpty()) {
}

亲爱的Flink大师,有什么线索可以看哪个指标吗?我正在使用Flink 1.9.0。

提前感谢任何提示!

共有1个答案

颛孙越
2023-03-14

在Flink Sink中使用HTTP调用时,我遇到了类似的检查点。经过大量的跟踪和错误,我发现,如果每秒的接收速率低于输入速率,检查点就会被击中。

为此,我为源(输入)指定了1的并行度,为HTTP调用指定了8的并行度。

这不会在等待HTTP响应时阻塞线程,从而发生检查点。我也是Flink的新手,希望一些大师解释为什么在flink中使用HTTP调用时检查点会变慢。

 类似资料:
  • 我有一种感觉,我在这件事上完全错了。但无论如何。 我有一个sql数据库,它本质上是一个有目的的非规范化表,我构造这个表是为了使这项任务对我来说更容易,所以我可以从一个表中获取内容。 我有一张成双成对的桌子,像这样: 等等 所以对于我的neo4j图数据库,我希望每个用户ID作为一个节点,其他的东西不是太重要,但基本上是关系中的东西。 我只希望每个用户有一个节点,所以我的感觉是,如果我这样做: 当我们

  • 我正在处理一个Spring BootNeo4j应用程序 在保存新密码之前,如何编写自定义密码来检查Priviledge是否存在 UserRole域:@NodeEntity公共类UserRole{ 特权域:

  • 我在Amazon KDA上部署了一个Apache Beam应用程序。 它使用默认设置启用了检查点。 但在应用程序日志中,我可以看到: "存在依赖检查点的无限制源,但检查点被禁用。" 只有当我将作为运行时属性传递给我的应用程序时,它才会进行检查点。那么有必要显式传递这些值吗? 该应用程序基本上从Kinesis读取窗口数据,将其转换为大小约为30的固定持续时间,然后将数据发布回PubSub。 应用程序

  • 我有下一个接口和实现: 我需要截取类的所有方法的执行,它扩展了带有注释@步骤的类。请帮我写切入点。 例如,我使用下一个切入点来截取类的方法,它由@步骤注释: 但它不工作,如果我注释只有超级类

  • 主要内容:使用检查点恢复检查点(checkpoint)是一种机制,其中所有先前的日志都从系统中删除并永久存储在存储磁盘中。 检查点就像一个书签。 在执行事务时,标记此类检查点,然后使用事务的步骤执行事务,将创建日志文件。 当它到达检查点时,事务将更新到数据库中,直到那时,整个日志文件将从文件中删除。 然后使用新的事务步骤更新日志文件,直到下一个检查点,依此类推。 检查点用于声明DBMS处于一致状态之前的一个点,并且所有事

  • 问题内容: 如果我这样做: 然后创建文件,并始终返回“文件存在”。是否可以不创建文件就检查文件是否存在? 编辑: 我忘了提到它处于for循环中。所以这是真实的东西: 问题答案: 实例化a时,您并没有在磁盘上创建任何东西,而只是构建了一个可以调用某些方法的对象,例如。 既好又便宜,不要试图避免这种实例化。 该实例只有两个字段: 这是构造函数: 如您所见,实例只是路径的封装。创建它以便进行调用是继续进