当前位置: 首页 > 知识库问答 >
问题:

Flink CEP事件未触发

融烨华
2023-03-14

我已经在Flink中实现了CEP模式,它按预期工作连接到本地Kafka代理。但是当我连接到基于集群的云kafka设置时,Flink CEP不会触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我正在使用AscendingTimestampExtractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

我也收到警告消息,

AscendingTimestampExtractor:140-违反时间戳单调性:1594017872227

而且我也尝试过使用Assignerwith周期水印和AssignerSusPunctuatedWatermark没有一个是工作

我附上了Flink控制台屏幕截图,其中水印未指定。更新的flink控制台屏幕截图

有人能帮忙吗?

共有1个答案

狄鹏
2023-03-14

CEP必须首先根据水印对输入流进行排序。所以问题可能出在水印上,但您还没有向我们展示足够的信息来调试原因。一个常见的问题是有一个空闲源,这会阻止水印前进。

但还有其他可能的原因。为了调试这种情况,我建议您查看一些指标,无论是在Flink Web UI中,还是在连接了指标系统的情况下在指标系统中。首先,通过查看管道不同阶段的numRecordsIn、numRecordsOut或numRecordsInPerSecond和numRecordsOutPerSecond检查记录是否在流动。

如果有事件,请在作业的不同任务中查看currentOutputWatermark,以查看事件时间是否提前。

更新时间:

您可能正在Kafka使用者上调用assignTimestampsAndWatermarks,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,这将保留整个水印。尝试在源代码生成的数据流上调用assignTimestampsAndWatermarks,看看这是否解决了问题。(当然,如果没有逐分区水印,您将无法使用AscendingTimestampExtractor,因为流将不有序。)

 类似资料:
  • 问题内容: 我有3个文件: js_json.js->用于我的json代码 javascript.js->用于我的javascript函数 index.php 这里的代码为: 这是我的代码: 这里的代码: 我的问题是: 当我单击链接“ Hola Test 1”时,它将起作用并显示消息。问题是,在单击选择选项之后,出现了链接“ Hola Test”,然后单击该链接(“ Hola Test”),该消息没

  • 问题内容: 我有一个带有一列复选框的GridView(GridView的其余部分正在从数据库中填充)。我正在使用AJAX执行不同的功能,并且想知道我是否只是在正确的位置调用了OnCheckedChanged事件。是否应该将其包装在某种UpdatePanel中?我对这一切的工作方式仍然很陌生…基本上,我的目标是在选中复选框后更改数据库中的位值。我知道该怎么做的逻辑,我只是​​不知道我是否以正确的方式

  • 我正在为android创建一个phonegap应用程序,并想使用一些phonegap事件,如“恢复”、“暂停”、“后退按钮”等,但除了“deviceready”事件外,这些事件都不会被触发。以下是我的javascript代码,请检查我是否犯了任何错误: “ondeviceredy()”函数中的警报正在工作。 请帮忙,提前谢谢。

  • 我在Flink SQL中使用了CEP模式,它按照预期连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP没有触发。以下是我的sql: 然后我以json格式发送消息,如 在 flink Web ui 中,水印工作精细 flink Web ui 我运行我的cep sql: 每个Kafka消息,connect_ 这是另一个仍然不起作用的cep sql。并且age

  • 我已经与celledit约会,ajax事件不会在单元格编辑时触发。事件监听器不会被调用。谢谢。 bean侦听器方法

  • 问题内容: 我有两个Redis客户端,在一个文件中,我有一个简单的脚本设置并删除了Redis密钥: 在第二个文件中,我有一个Redis客户端充当订户: 关键的“占位符”已设置,那么是否有充分的理由使我在“消息”处理程序中未获得任何输出? 问题答案: 您忘记了订阅用户客户端订阅特定的频道。此外,如果要监视所有事件,则需要使用基于模式的订阅。 您可能想要执行以下操作(未测试): 请参阅Redis文档和