当前位置: 首页 > 知识库问答 >
问题:

Flink:在Flink作业中处理异常的最佳方法是什么

孙佐
2023-03-14

我有一个瞬移工作,接受Kafka的主题,通过一堆操作员。我想知道什么是最好的方法来处理中间发生的异常。

假设存在异常,使用processFunction并在catch块中输出SideOutputContext,并在SideOutput调用外部服务以更新另一个相关作业状态的末尾为SideOutput提供单独的接收器函数

但是,我的问题是,通过这样做,我似乎仍然需要调用collector.collection()并传入一个空值,以便继续到下面的运算符并进入最后一个阶段,在这个阶段,SideOutput将流入单独的接收器函数。这样做对吗?

另外,我不确定如果不在操作符内部调用collector.collection()会发生什么,它会不会挂在那里导致内存泄漏?

共有1个答案

高才
2023-03-14

调用collector.collection()也可以。并且在使用边输出捕获异常时,不需要用null值调用collection()-每个运算符都可以有自己的边输出。最后,如果有多个这样的运算符带有一个用于异常的边输出,则可以在将该流发送到接收器之前,将边输出一起union()

如果下游操作员出于某种原因需要知道有异常,那么一种方法是输出或者 ,但是每个下游操作员当然需要有代码来检查它接收到的内容。

 类似资料:
  • 我有一个超时执行任务的方法。我使用ExecutorServer.submit()获取一个Future对象,然后调用future.get()并超时。这很好,但是我的问题是处理我的任务可能抛出的检查异常的最好方法。下面的代码工作正常,并且保留了被检查的异常,但是如果方法签名中被检查的异常的列表改变了,它看起来非常笨拙并且容易出错。 关于如何解决这个问题的任何建议?我需要以Java 5为目标,但我也很好

  • 进入Flink作业的数据可能会由于代码中的bug或缺乏验证而触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在Flink作业中使用,不会导致任何生产停机。 > 重新启动策略在这里似乎不适用,因为: null null 示例代码: 我想有能力跳过在“keyby”和类似的方法中导致问题的处理,这些方法应该返回一个结果。

  • Apache Flink-“keyby”中的异常处理 根据第一个链接,用户说他在processfn中使用sideoutput来捕获错误,我也在我的程序中使用sideoutput来发送与模式不匹配的数据,但是我不知道如何处理错误和无效数据到相同的sideoutput 根据第二个链接,用户正在尝试添加一个sink到keyby函数和null key和printsink函数,这是我完全不理解的 1)任何关

  • 问题内容: 我有一个方法可以执行一些超时任务。我使用ExecutorServer.submit()获取Future对象,然后使用超时调用future.get()。这工作正常,但是我的问题是处理可能由我的任务引发的检查异常的最佳方法。以下代码可以正常工作,并保留检查的异常,但是如果方法签名中的检查的异常列表发生更改,则显得非常笨拙且容易中断。 对于如何解决这个问题,有任何的建议吗?我需要以Java

  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?

  • 上周,我决定尝试Perl6,并开始重新实现我的一个程序。我不得不说,Perl6对于对象编程来说非常容易,这对我来说是Perl5非常痛苦的一个方面。 我的程序必须读取和存储大文件,如整个基因组(高达3 Gb或更多,见下面的例子1)或制表数据。 代码的第一个版本是通过逐行迭代以Perl5的方式制作的(“基因组.fa”。对于正确的执行时间,它非常慢且不可行。 所以经过一点RTFM之后,我改变了文件上的s