当前位置: 首页 > 知识库问答 >
问题:

Apache Flink错误处理和条件处理

厍彭薄
2023-03-14

我是Flink的新手,已经通过网站/示例/博客开始学习。我正在努力正确使用操作符。基本上我有两个问题

问题1:Flink是否支持声明性异常处理,我需要处理解析/验证/。。。错误?

  • 我可以使用组织吗。阿帕奇。Flink。运行时。操作员。分类ExceptionHandler或类似的程序来处理错误
  • 还是Rich/FlatMap功能是我的最佳选择?如果Rich/FlatMap是唯一的选项,那么是否有办法在Rich/FlatMap函数中获取流的句柄,以便连接接收器进行错误处理

问题2:我可以有条件地附加不同的水槽吗?

  • 根据键控分割流中的特定字段,我需要选择不同的接收器,我是再次分割流还是使用富/平面图来处理

我正在使用Flink 1.3.2。这是我工作的相关部分

    .....
    .....
    DataStream<String> eventTextStream = env.addSource(messageSource)

    KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
            // parse, transform or enrich
            .flatMap(new MyParseTransformEnrichFunction())
            .assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
            .keyBy("eventId");

    // split stream based on eventType as different reduce and windowing functions need to be applied
    SplitStream<EventPojo> splitStream = eventPojoStream
            .split(new EventStreamSplitFunction());

    // need to apply reduce function
    DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");

    // need to apply reduce function
    DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");

    // need to apply time based windowing function
    DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");

    ....
    ....

    env.execute("Event Processing");      

我在这里使用的运算符是否正确?

更新1:

尝试使用@alpinegizmo建议的ProcessFunction,但这不起作用,因为它依赖于一个键控流,而我在解析/验证输入之前没有键控流。我得到“InvalidProgrameException:对于非复合类型,字段表达式必须等于'*'或'\u''”。

这是一个常见的用例,您的第一次解析/验证输入,但还没有键控流,所以如何解决它?

谢谢你的耐心和帮助。

共有1个答案

南宫凯康
2023-03-14

您忽略了一个关键的构建块。查看侧面输出。

此机制提供了一种类型安全的方式来生成任意数量的附加输出流。这可能是报告错误的干净方法,以及其他用途。在Flink中,1.3侧输出只能与ProcessFunction一起使用,但1.4会将侧输出添加到ProcessWindowFunction。

 类似资料:
  • 错误处理 有些方法通通过参数返回 error 的引用,使用这样的方法时应当检查方法的返回值,而非 error 的引用。 推荐: NSError *error = nil; if (![self trySomethingWithError:&error]) { // Handle Error } 此外,一些苹果的 API 在成功的情况下会对 error 参数(如果它非 NULL)写入垃圾值(

  • 如果 Puppeteer 方法无法执行一个请求,就会抛出一个错误。例如,page.waitForSelector(selector[, options]) 选择器如果在给定的时间范围内无法匹配节点,就会失败。 对于某些类型的错误,Puppeteer 使用特定的错误类处理。这些类可以通过 require('puppeteer/Errors') 获得。 支持的类列表: TimeoutError 一个处

  • 通过对错误类型实现 Display 和 From,我们能够利用上绝大部分标准库错误处理工具。然而,我们遗漏了一个功能:轻松 Box 我们错误类型的能力。 标准库会自动通过 Form 将任意实现了 Error trait 的类型转换成 trait 对象 Box<Error> 的类型(原文:The std library automatically converts any type that imp

  • 错误处理(error handling)是处理可能发生失败情况的过程。例如读取一个文件失败,然后继续使用这个失效的输入显然是有问题的。错误处理允许我们以一种显式的方式来发现并处理这类错误,避免了其余代码发生潜在的问题。 有关错误处理的更多内容,可参考官方文档的错误处理的章节。

  • 处理一个 RESTful API 请求时, 如果有一个用户请求错误或服务器发生意外时, 你可以简单地抛出一个异常来通知用户出错了。 如果你能找出错误的原因 (例如,所请求的资源不存在),你应该 考虑抛出一个适当的HTTP状态代码的异常 (例如, yii\web\NotFoundHttpException意味着一个404 HTTP状态代码)。 Yii 将通过HTTP状态码和文本发送相应的响应。 它还

  • Yii 内置了一个error handler错误处理器,它使错误处理更方便, Yii错误处理器做以下工作来提升错误处理效果: 所有非致命PHP错误(如,警告,提示)会转换成可获取异常; 异常和致命的PHP错误会被显示, 在调试模式会显示详细的函数调用栈和源代码行数。 支持使用专用的 控制器操作 来显示错误; 支持不同的错误响应格式; error handler 错误处理器默认启用, 可通过在应用的