当前位置: 首页 > 知识库问答 >
问题:

为什么Flink不重试操作员故障?

蔚丰
2023-03-14

我计划将我们的一个Spark应用程序迁移到Apache Flink。我试图了解它的容错特性。

我执行了以下代码,我看不到Flink实际上尝试重试任何任务(或子任务)。这可能会导致我丢失数据。我该怎么做才能确保每一次失败都能被Flink所覆盖?

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("file:///my-path", false))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(0, TimeUnit.SECONDS) // delay
    ))
    env.enableCheckpointing(10L)
    val text = env.socketTextStream(hostName, port)
    text
      .map { input =>
        List(input)
      }.setParallelism(1)
      .flatMap { input =>
        println(s"running for $input")
        List.fill(5)(input.head) ::: {
          println("throw exception here")
          throw new RuntimeException("some exception")
          List("c")
        }
      }

我希望在屏幕上多次看到抛出异常消息。但是,当我使用fixedDelayRestart时,它似乎只是忽略了此消息,并为其他消息继续。

共有1个答案

曹光霁
2023-03-14

这取决于您如何启动应用程序。

我假设您正在IDE中运行这个。在这种情况下,StreamExecutionEnvironment。getExecutionEnvironment(getExecutionEnvironment)html" target="_blank">返回一个本地流执行环境(LocalStreamExecutionEnvironment),该环境在一个进程中运行程序和所有Flink,即主进程(在Flink JobManager中)和工作进程(TaskManager)作为同一JVM进程中的线程启动。异常终止此单个进程。因此,没有剩下可以重新启动程序的Flink进程。

如果您想以容错方式运行程序,您需要将其提交到Flink环境,例如在您的本地计算机上运行的环境。下载Flink发行版,提取存档文件,然后运行./bin/start-cluster.sh。这将启动两个进程,一个主进程和一个工作进程。然后,您可以通过创建一个带有StreamExecutionE的远程执行环境来将程序提交到集群nvironment.create远程执行环境并将主机名和端口作为参数传递(请查看留档了解详细信息)。

请注意,异常仍将终止工作进程。因此,为了能够重新启动程序,您需要手动启动工作进程。在生产环境中,这通常由Kubernetes、纱线或Mesos负责。

顺便说一下,我们最近在Flink留档中添加了一个操作游乐场。这是一个基于Docker的沙盒环境,可以使用Flink的容错功能。我建议查看:Flink操作游乐场。

更多提示:

  • 10ms的检查点间隔非常短。
  • 文本套接字源不提供至少一次(或精确一次)保证。记录最多处理一次。
 类似资料:
  • 以下是MWE: 奇怪的是,无论我键入

  • 考虑以下两行代码: 对我来说,他们看起来很相似。但是,第二行被Java编译器(1.8)拒绝,消息是“不能推断SimpleFileVisitor的类型参数<>”。 谁能解释一下,有什么问题吗?

  • 问题内容: 在Python 3中,operator.or_等效于按位,而不是逻辑。为什么没有逻辑运算符? 问题答案: 在与运营商不能表示为,因为它们的功能短路行为: 在这些情况下,永远不会调用。 另一方面,假设的则必须调用,因为函数参数总是在调用函数之前进行求值。

  • 代码如下: 运行时,这是错误消息: 错误:不匹配'运算符*'(操作数类型是'std::__cxx11::字符串{aka std::__cxx11::basic_string 如何修复此错误并使程序正确运行?

  • 问题内容: 5 in [1, 2, 3, 4] == False False 我知道这是测试会员资格的一种奇怪方法,并且 是“正确”的方式。令我感到困惑的是,它的行为不同于两者 和 我错过了明显的事情吗?(在Python 2.7和Python 3.4中测试)。 为了澄清,我理解了最后三个片段。我在问 第一个 代码片段的行为,以及为什么与众不同。 问题答案: 这是一个链式比较。您可能知道您可以做 在

  • 问题内容: 在深入研究之后,我发现Stream和Collector之间存在许多重复的逻辑,这些逻辑违反了不要重复自己的原则,例如:jdk-9和中的Stream#map&Collectors#mapping,Stream#filter&Collectors#filtering。等等 但自从溪流遵守告诉,不要问得墨meter耳的法则/ 得墨Law律和集热器遵守继承构成原则看来,这是合理的。 我只能想到