当前位置: 首页 > 知识库问答 >
问题:

flink cep sql事件未触发

竺翰海
2023-03-14

我在Flink SQL中使用了CEP模式,它按照预期连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP没有触发。以下是我的sql:

create table agent_action_detail 
(
    agent_id String, 
    room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time  - INTERVAL '1' MINUTE) 
with ('connector'='kafka', 'topic'='agent-action-detail', ...)

然后我以json格式发送消息,如

{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}

在 flink Web ui 中,水印工作精细 flink Web ui

我运行我的cep sql:

select * from agent_action_detail
 match_recognize(
    partition by agent_id 
    order by row_time 
    measures 
        last(BF.create_time) as create_time, 
        first(AF.connect_time) as connect_time 
    one row per match AFTER MATCH SKIP PAST LAST ROW 
    pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
 )

每个Kafka消息,connect_

select * from agent_action_detail match_recognize( partition by agent_id order by row_time  measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)

这是另一个仍然不起作用的cep sql。并且agent_action_detail表被另一个flink sql作为

insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'

共有1个答案

夏侯俊美
2023-03-14

有几种情况可能导致模式匹配不产生任何结果:

  • 输入实际上不包含模式
  • 水印不正确
  • 该模式在某种程度上是病态的

这个特殊的模式没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。

如果您直接使用Flink CEP,我会告诉您尝试添加直到(条件)在(时间)内,以限制可能匹配的数量。

使用MATCH_RECOGNIZE,查看是否可以向模式添加不同的终止元素。

更新:由于您在修改模式后仍然没有得到任何结果,因此您应该确定水印是否是问题的根源。CEP依赖于按时间对输入流进行排序,这取决于水印——但前提是您使用的是事件时间。

最简单的测试方法是切换到使用流转时长:

create table agent_action_detail 
(
    agent_id String, 
    ...
    row_time AS PROCTIME()
)
with (...)

如果这行得通,那么时间戳或水印就是问题所在。例如,如果所有事件都延迟了,您将得不到任何结果。对于您的情况,我想知道row_time列中有没有数据。

如果这不能揭示问题,请分享一个最小的可重复的例子,包括观察问题所需的数据。

 类似资料:
  • 问题内容: 我有3个文件: js_json.js->用于我的json代码 javascript.js->用于我的javascript函数 index.php 这里的代码为: 这是我的代码: 这里的代码: 我的问题是: 当我单击链接“ Hola Test 1”时,它将起作用并显示消息。问题是,在单击选择选项之后,出现了链接“ Hola Test”,然后单击该链接(“ Hola Test”),该消息没

  • 问题内容: 我有一个带有一列复选框的GridView(GridView的其余部分正在从数据库中填充)。我正在使用AJAX执行不同的功能,并且想知道我是否只是在正确的位置调用了OnCheckedChanged事件。是否应该将其包装在某种UpdatePanel中?我对这一切的工作方式仍然很陌生…基本上,我的目标是在选中复选框后更改数据库中的位值。我知道该怎么做的逻辑,我只是​​不知道我是否以正确的方式

  • 我正在为android创建一个phonegap应用程序,并想使用一些phonegap事件,如“恢复”、“暂停”、“后退按钮”等,但除了“deviceready”事件外,这些事件都不会被触发。以下是我的javascript代码,请检查我是否犯了任何错误: “ondeviceredy()”函数中的警报正在工作。 请帮忙,提前谢谢。

  • 我已经在Flink中实现了CEP模式,它按预期工作连接到本地Kafka代理。但是当我连接到基于集群的云kafka设置时,Flink CEP不会触发。 我正在使用AscendingTimestampExtractor, 我也收到警告消息, AscendingTimestampExtractor:140-违反时间戳单调性:1594017872227 而且我也尝试过使用Assignerwith周期水印和

  • 我已经与celledit约会,ajax事件不会在单元格编辑时触发。事件监听器不会被调用。谢谢。 bean侦听器方法

  • 问题内容: 我有两个Redis客户端,在一个文件中,我有一个简单的脚本设置并删除了Redis密钥: 在第二个文件中,我有一个Redis客户端充当订户: 关键的“占位符”已设置,那么是否有充分的理由使我在“消息”处理程序中未获得任何输出? 问题答案: 您忘记了订阅用户客户端订阅特定的频道。此外,如果要监视所有事件,则需要使用基于模式的订阅。 您可能想要执行以下操作(未测试): 请参阅Redis文档和