当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink中处理瞬态/应用程序故障?

曾骁
2023-03-14

我的Flink处理器监听Kafka,处理器中的业务逻辑涉及调用外部REST服务,服务可能会停止。我想将元组重放回处理器中,是否仍有这样做的方法?我使用了Storm,我们将能够使元组失败,这样元组就不会被确认。因此,相同的元组将重播到处理器。

在Flink中,一旦消息被Flink Kafka消费者消费,元组就会被自动确认。有很多方法可以解决这个问题。其中一种方法是将消息发布回同一队列/重试队列。但我正在寻找一种类似于Storm的解决方案。

我知道Flink的保存点/检查点将用于容错。但据我所知,在Flink失败的情况下,元组将重演。我想知道如何处理暂时性故障。

谢谢你

共有1个答案

李跃
2023-03-14

与外部系统交互时,我建议使用Flink的异步I/O运算符。它允许您执行异步任务而不会阻止运算符的执行

如果要重试失败的操作,而不从最后一个成功的检查点重新启动Flink作业,那么我建议您自己实现重试策略。它可以如下所示:

new AsyncFunction<IN, OUT>() {
    @Override
    public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        FutureUtils
            .retrySuccessfulWithDelay(
                () -> triggerAsyncOperation(input),
                Time.seconds(1L),
                Deadline.fromNow(Duration.ofSeconds(10L)),
                this::decideWhetherToRetry,
                new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
            .whenComplete((result, throwable) -> {
                if (result != null) {
                    resultFuture.complete(Collections.singleton(result));
                } else {
                    resultFuture.completeExceptionally(throwable);
                }
            })
    }
}

使用triggerAsyncAction封装您的异步操作和decdeWhetherToRetry封装您的重试策略。如果decdeWhetherToRetry返回true,则结果未来将使用此操作尝试的值完成。

如果异常完成了resultFuture,那么它将触发故障转移,这将导致作业从最后一个成功的检查点重新启动。

 类似资料:
  • 问题内容: 我正在开发一个本机android应用程序,其中尝试使用2个开源库。问题是两个库都在各自的库中使用应用程序类。他们正在使用application标签下的“ android:name”在清单文件的相应源代码中注册这些类。问题是如何处理这种情况,因为众所周知,清单文件中只能使用ONE标签。我们是否可以在代码中注册/实例化应用程序类,以便我们在标记中仅提及一个库,而在代码中/实用地提及第二个库

  • 我正在探索Cadence,有一个关于故障恢复的问题。我知道工作流是容错的(工作流历史被维护),以防工作流工作人员失败。我找不到活动工作人员的相同保证。例如:假设一个活动对服务A进行了RPC调用,这改变了一些远程对象状态;现在,让我们假设调用成功了,但活动工作人员在通知Cadence服务之前丢失了。在这种情况下,Cadence会在一个新工作人员上再次安排活动吗? 我知道如果服务A是幂等的,上述可能不

  • 问题内容: 我正在将现有的应用程序移植到Flux,并且对一个主题有些困惑。假设我有几个API端点,它们返回两级或三级嵌套对象。 例如,可能返回架构的JSON响应 如您所见,有各种各样的用户处于不同的嵌套级别: 如果我想在获取文章时随时使用新数据进行更新,则必须编写一个怪异的方法来检查文章API响应上的所有嵌套实体。而且,将存在很多重复,因为还有其他API端点具有不同的架构,有时文章嵌入在用户内部(

  • 在SpringJSFWeb应用程序中将Netty客户端处理程序配置为消息接收点,有没有具体的方法? 如果一些独立的Java应用程序充当Netty服务器,我如何接收到SpringJSFWeb应用程序的消息?

  • 我有一个带post请求的控制器。我试图用一个简单的NotNull注释验证POJO。我正在使用ControllerAdvice来处理异常。 所以我尝试使用它,但当我启动应用程序时,我得到了以下信息: 因此,我想为BindException创建自己的处理程序,但当我为BindException类创建ExceptionHandler时,spring应用程序不会启动。如果我注释掉handleBindExc

  • 我试图抓住无效的json,而解析它与jiffy在牛仔web套接字处理程序。如果json是有效的/无效的,我想转发一个适当的消息到,它将回复客户端。这是我的代码。 这会导致运行时异常。 12:07:48.406[错误]牧场侦听器http已连接到进程 那我该怎么做呢?