当前位置: 首页 > 知识库问答 >
问题:

Flink任务管理器在重启后不处理数据

严心水
2023-03-14

我是flink的新手,我部署了我的flink应用程序,它基本上执行简单的模式匹配。它部署在库伯内特斯集群中,具有1个JM和6个TM。我每10分钟向eventhub主题发送大小4.4k和200k消息并执行负载测试。我添加了重启策略和检查点,如下所示,我没有在代码中显式使用任何状态,因为没有要求

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 // start a checkpoint every 1000 ms
 env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
 // advanced options:
 // make sure 500 ms of progress happen between checkpoints
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig().setCheckpointTimeout(120000);
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 // enable externalized checkpoints which are retained after job cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 // allow job recovery fallback to checkpoint when there is a more recent savepoint
 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
         5, // number of restart attempts
         Time.of(5, TimeUnit.MINUTES) // delay
 ));

最初,我遇到了网络缓冲区的Netty服务器问题,我遵循了以下链接https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-网络内存每门浮动缓冲区flink网络和堆内存优化,并应用以下设置,一切正常

taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 1024mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 2mb
taskmanager.network.memory.floating-buffers-per-gate: 16
cluster.evenly-spread-out-slots: true
taskmanager.heap.size: 1024m
taskmanager.memory.framework.heap.size: 64mb
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.framework.off-heap.size: 64mb
taskmanager.memory.network.fraction: 0.4
taskmanager.memory.jvm-overhead.min: 256mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.4

但我有两个问题

>

对不起,如果我的理解是错误的,请纠正我,flink在我的代码中有一个重启策略,我限制了5次重启尝试。如果我的flink作业没有成功克服任务失败,整个flink作业将保持空闲状态,我必须手动重启作业,或者我可以添加任何机制来重启我的作业,即使它超过了重启作业尝试的限制。

是否有任何文档可以根据数据大小和系统接收数据的速率计算我应该分配给flink job cluster的内核和内存数量?

闪烁CEP优化技术是否有任何留档?

这是我在作业管理器中看到的错误堆栈跟踪

在模式匹配之前,我在job manager日志中看到以下错误

原因:组织。阿帕奇。Flink。运行时。io。网络内蒂。例外RemoteTransportException:远程任务管理器“/10.244.9.163:46377”意外关闭连接。这可能表示远程任务管理器丢失。位于组织。阿帕奇。Flink。运行时。io。网络内蒂。CreditBasedPartitionRequestClientHandler。位于组织的channelInactive(CreditBasedPartitionRequestClientHandler.java:136)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:257)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:243)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。位于org的fireChannelInactive(AbstractChannelHandlerContext.java:236)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。处理程序。编解码器。ByteToMessageDecoder。位于org的channelInputClosed(ByteToMessageDecoder.java:393)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。处理程序。编解码器。ByteToMessageDecoder。位于org的channelInactive(ByteToMessageDecoder.java:358)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:257)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:243)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。位于org的fireChannelInactive(AbstractChannelHandlerContext.java:236)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道DefaultChannelPipeline$HeadContext。位于组织的channelInactive(DefaultChannelPipeline.java:1416)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:257)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannelHandlerContext。org上的invokeChannelActive(AbstractChannelHandlerContext.java:243)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道DefaultChannelPipeline。组织中的fireChannelInactive(DefaultChannelPipeline.java:912)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道AbstractChannel$AbstractSafeture$8。在org上运行(AbstractChannel.java:816)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。util。同时发生的AbstractEventExecutor。safeExecute(AbstractEventExecutor.java:163)位于org。阿帕奇。Flink。阴影部分。净值4.io。内蒂。util。同时发生的SingleThreadEventExecutor。在org上运行所有任务(SingleThreadEventExecutor.java:416)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。频道nio。NioEventLoop。在org上运行(NioEventLoop.java:515)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。util。同时发生的SingleThreadEventExecutor 5美元。在org上运行(SingleThreadEventExecutor.java:918)。阿帕奇。Flink。阴影部分。净值4.io。内蒂。util。内部的ThreadExecutorMap 2美元。在java上运行(ThreadExecutorMap.java:74)。lang.Thread。运行(Thread.java:748)

提前谢谢,请帮我解决我的疑问

共有1个答案

司宏伯
2023-03-14

各点:

如果您的模式涉及匹配的时间序列(例如,“A后跟B”),那么您需要状态来执行此操作。大多数Flink的源和接收器也在内部使用状态来记录偏移等,如果您关心精确一次保证,则需要检查点此状态。如果模式是动态流式传输的,那么您也需要将模式存储在Flink状态中。

代码中的一些注释与配置参数不匹配:例如,“500 ms的进度”vs.1000,“检查点必须在一分钟内完成”vs.120000。此外,请记住,复制这些设置的文档部分并不是推荐最佳做法,而是演示如何进行更改。特别是,<代码>环境。getCheckpointConfig()。setPreferCheckpointForRecovery(true) 是个坏主意,配置选项可能不存在。

配置中的某些条目。yaml正在关注<代码>taskmanager。记忆力管理。分数相当大(0.7)--这只有在使用RocksDB时才有意义,因为托管内存对于流媒体没有其他用途。和taskmanager。记忆力网络分数和taskmanager。记忆力jvm开销。分数都非常大,这三个分数的和是1.5,这没有意义。

一般来说,默认网络配置在各种部署场景中都能很好地工作,除了在大型集群中(这里不是这种情况)之外,不需要调整这些设置。你遇到了什么样的问题?

关于您的问题:

>

  • 在TM故障和恢复后,TMs应自动从最近的检查点恢复处理。要诊断为什么不会发生这种情况,我们需要更多信息。要获得正确处理此问题的部署的经验,您可以使用Flink Operations Played进行实验。

    一旦配置的重启策略完成,作业将失败,Flink将不再尝试恢复该作业。当然,如果您想要更复杂的东西,您可以在Flink的RESTAPI上构建自己的自动化。

    容量规划文件?不,不是真的。这通常是通过反复试验来解决的。不同的应用程序往往以难以预测的方式具有不同的需求。您选择的序列化程序、状态后端、keyBys的数量、源和汇、密钥偏移、水印等都会产生重大影响。

    关于优化CEP的文档?不,对不起。要点是

    • 尽一切可能限制匹配;避免必须无限期保持状态的模式

  •  类似资料:
    • 我让Flink服务器在端口8081上本地运行。我在上面部署了我的flink应用程序的jar,它成功运行。 现在,我需要听取普罗米修斯的Flink指标,为此,我需要将作业管理器和任务管理器配置为普罗米修斯的目标。yml。 那么,我在哪里可以获得相同的端口?

    • 假设一个正在运行一个设置为并行的集群 使用每个任务槽运行多个TM 使用多个任务槽运行单个/几个TM 我们正在运行版本为1.6.3的Flink群集。Flink 1.7发行说明指出,“Flink现在正确地支持具有多个插槽的TaskManager”。在以前的版本中,是否建议从单个插槽开始?

    • 每个新创建的任务都是 org.gradle.api.DefaultTask 类型, org.gradle.api.Task 的标准实现,DefaultTask 所有的域都是私有的,意味着他们只能通过 setter 和 getter 方法来访问,庆幸的是Groovy提供了一些语法糖来允许你通过名字来使用域。 管理项目的版本 许多公司或者开源组织有他们自己的发布版本的措施,一般用主版本号和次版本号来表

    • 用 ceph-deploy 建立一个集群后,你可以把客户端管理密钥和 Ceph 配置文件发给其他管理员,以便让他用 ceph 命令管理集群。 创建一管理主机 要允许一主机以管理员权限执行 Ceph 命令,用 admin 命令: ceph-deploy admin {host-name [host-name]...} 分发配置文件 要把改过的配置文件分发给集群内各主机,可用 config push

    • 在San CLI中主要通过san serve和san build命令进行生产和开发环境的打包,San的脚手架工程内置了四个命令,包括启动本地服务、生产环境打包、打包分析、现代模式打包等,脚手架工程的package.json内可执行的命令有: npm run start // 打包+启动本地服务 npm run build // 打包发布 npm run analyzer // 打包分析 npm r

    • 任务管理(又称 构建)工具 任务管理/构建 工具: Gulp Grunt Tasking/build and more tools: Brunch Mimosa