我在Flink SQL中使用了CEP模式,它按照预期连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP没有触发。以下是我的sql:
create table agent_action_detail
(
agent_id String,
room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time - INTERVAL '1' MINUTE)
with ('connector'='kafka', 'topic'='agent-action-detail', ...)
然后我以json格式发送消息,如
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}
在 flink Web ui 中,水印工作精细 flink Web ui
我运行我的cep sql:
select * from agent_action_detail
match_recognize(
partition by agent_id
order by row_time
measures
last(BF.create_time) as create_time,
first(AF.connect_time) as connect_time
one row per match AFTER MATCH SKIP PAST LAST ROW
pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
)
每个Kafka消息,connect_
select * from agent_action_detail match_recognize( partition by agent_id order by row_time measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)
这是另一个仍然不起作用的cep sql。并且agent_action_detail表被另一个flink sql作为
insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'
有几种情况可能导致模式匹配不产生任何结果:
这个特殊的模式没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。
如果您直接使用Flink CEP,我会告诉您尝试添加直到(条件)
或在(时间)
内,以限制可能匹配的数量。
使用MATCH_RECOGNIZE
,查看是否可以向模式添加不同的终止元素。
更新:由于您在修改模式后仍然没有得到任何结果,因此您应该确定水印是否是问题的根源。CEP依赖于按时间对输入流进行排序,这取决于水印——但前提是您使用的是事件时间。
最简单的测试方法是切换到使用流转时长:
create table agent_action_detail
(
agent_id String,
...
row_time AS PROCTIME()
)
with (...)
如果这行得通,那么时间戳或水印就是问题所在。例如,如果所有事件都延迟了,您将得不到任何结果。对于您的情况,我想知道row_time列中有没有数据。
如果这不能揭示问题,请分享一个最小的可重复的例子,包括观察问题所需的数据。
问题内容: 我有3个文件: js_json.js->用于我的json代码 javascript.js->用于我的javascript函数 index.php 这里的代码为: 这是我的代码: 这里的代码: 我的问题是: 当我单击链接“ Hola Test 1”时,它将起作用并显示消息。问题是,在单击选择选项之后,出现了链接“ Hola Test”,然后单击该链接(“ Hola Test”),该消息没
问题内容: 我有一个带有一列复选框的GridView(GridView的其余部分正在从数据库中填充)。我正在使用AJAX执行不同的功能,并且想知道我是否只是在正确的位置调用了OnCheckedChanged事件。是否应该将其包装在某种UpdatePanel中?我对这一切的工作方式仍然很陌生…基本上,我的目标是在选中复选框后更改数据库中的位值。我知道该怎么做的逻辑,我只是不知道我是否以正确的方式
我正在为android创建一个phonegap应用程序,并想使用一些phonegap事件,如“恢复”、“暂停”、“后退按钮”等,但除了“deviceready”事件外,这些事件都不会被触发。以下是我的javascript代码,请检查我是否犯了任何错误: “ondeviceredy()”函数中的警报正在工作。 请帮忙,提前谢谢。
我已经在Flink中实现了CEP模式,它按预期工作连接到本地Kafka代理。但是当我连接到基于集群的云kafka设置时,Flink CEP不会触发。 我正在使用AscendingTimestampExtractor, 我也收到警告消息, AscendingTimestampExtractor:140-违反时间戳单调性:1594017872227 而且我也尝试过使用Assignerwith周期水印和
我已经与celledit约会,ajax事件不会在单元格编辑时触发。事件监听器不会被调用。谢谢。 bean侦听器方法
问题内容: 我有两个Redis客户端,在一个文件中,我有一个简单的脚本设置并删除了Redis密钥: 在第二个文件中,我有一个Redis客户端充当订户: 关键的“占位符”已设置,那么是否有充分的理由使我在“消息”处理程序中未获得任何输出? 问题答案: 您忘记了订阅用户客户端订阅特定的频道。此外,如果要监视所有事件,则需要使用基于模式的订阅。 您可能想要执行以下操作(未测试): 请参阅Redis文档和