当前位置: 首页 > 知识库问答 >
问题:

Watermark在Flink CEP中远远落后

丁良骏
2023-03-14
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

在Web UI中,有两个任务:第一个任务是源->过滤器->映射->时间戳/水印;第二个是cepoperator->sink。每个任务得到104个并行度。

子任务的工作负载不均匀,它应该来自keyby。子任务之间的水印是不同的,但是它们开始被固定在一个值上,在很长一段时间内没有变化。从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游sink。

事件率为10k/s,第一个任务的背压保持“高码”,第二个任务的背压保持“好码”,第一个任务的背压保持“高码”,第二个任务的背压保持“好码”,第二个任务的背压保持“好码”。

谢谢

共有1个答案

丁晋
2023-03-14

对你的问题作了更仔细的考虑后,我正在修改我的回答。

听起来CEP正在继续产生火柴,它们正在被推到水槽,但是CEP+水槽任务正在产生很高的背压。找出背压的原因会有帮助。

如果事件可以从所有分区读取,而水印只是勉强向前推进,听起来背压很大,足以阻止事件被吸收。

    null

以下是一些获得更多洞察力的想法:

(1)尝试使用探查器来确定CepOperator是否是瓶颈,并可能识别它正在做什么。

(2)禁用CepOperator和sink之间的操作符链接,以便隔离CEP--仅作为调试步骤。这将为您提供更好的可见性(通过度量和背压监控),以了解CEP和水槽各自在做什么。

 类似资料:
  • Watermark 是一个 jQuery 插件,它可以通过 HTML5 和 JavaScript 在图片上增加水印。 它有以下特点: 用图像或者文字来盖章 允许你在一幅图像上的8个角落任选一个位置加水印 水印图像确定后才决定大小和格式 导出的图像是 base64 类型,所以可能会直接改为老的图片或者上传服务器,例如 Imgur。

  • remote,远程,指的是远程仓库。你可以为项目创建一个远程仓库,然后把项目在本地的仓库推送到远程仓库上。可以设置让其他人也可以访问远程仓库,或者允许他们也可以把自己在本地对项目做的提交推送到远程仓库上。 为项目创建远程仓库,你可以选择一个提供免费远程仓库的服务商,列表见附录。

  • 注:本节未经校验,如有问题欢迎提issue 要了解关于Akka的远程调用能力的简介请参阅位置透明性. 注意 正如那一章所解释的,Akka remoting是按照端到端(peer-to-peer)对等通信的方式设计的,并在建立客户端-服务器(client-server)模式时受到限制。特别是Akka Remoting除其他外,不能与网络地址转换(Network Address Translation

  • The Watermark plugin enables users to condense page layout by adding descriptive labels on top of the input or textarea fields when their value is empty. This plugin is built on the same concept as th

  • This plugin allows you to create a text that's only displayed when there no text on the textbox and disappears when you focus on it. This watermark is a little bit different from others I've seen, tha

  • 问题内容: 我想读取一个远程图像并显示它。我可以保存文件,但无法正确显示代码。理想情况下,我只想尽管正确地传递文件但不进行处理- 不确定是否需要tmp文件步骤。此代码不显示任何内容-没有错误。我也尝试过res.pipe(response)。 问题答案: 好吧,我仍然想知道如何进行上述工作,但是我通过请求模块解决了我的问题!