当前位置: 首页 > 知识库问答 >
问题:

如何在光闪sql表中自动生成水印?

孙自怡
2023-03-14

我正在测试flink-cep-sql,我的水印定义为行时间,我的表是kafka表。由于水印依赖于所有kafka分区中的最小值,所以每个新消息都必须等待kafka划分对齐,然后cep触发结果。

我的kafka表(主题有3个分区)定义为

create table test_table(
    agent_id String, room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time as to_timestamp_ltz(create_time, 3), 
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)

这是我的cep sql

select * from test_table  match_recognize (
   partition by agent_id,room_id,call_type 
   order by row_time
   measures  
       last(BF.create_time) as create_time, 
       last(AF.connect_time) as connect_time 
   one row per match after match SKIP PAST LAST ROW 
   pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
   define 
       BF as BF.connect_time = 0,
       AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type 
) as T ;

cep sql触发结果是正确的,但总是迟到,因为每个分区都需要对齐水印。如何立即获得最新结果或在flink sql表中自动生成水印?

共有1个答案

吴高远
2023-03-14

您的模式要求查找< code>connect_time的行

如果您愿意放松模式匹配语义学,为什么不将此MATCH_RECOGNIZE查询替换为间隔连接(带有时间约束的自连接)。有关详细信息,请参阅https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins。

顺便说一下,这部分定义了AF

... and BF.room_id = AF.room_id and BF.call_type = AF.call_type

没有任何影响,因为流已经由room_idcall_type分区。

 类似资料:
  • 我正在为使用match_recognize的Flink SQL语句编写单元测试。我正在这样设置测试数据 我有两个问题, 如何将event_time指定为水印字段?(表示行时间) 不太重要,给表创建一个有意义的名称? FLINK版本:1.11

  • 我在Adobe Acrobat Pro中制作了包含Radiobutton,文本字段,按钮,复选框和条形码的PDF格式。一切正常。 但根据新的要求,我必须“自动生成”一些字段,如Radiobutton,Text Field和CheckBox,点击“添加字段”按钮,点击该按钮,控件应该自动生成到PDF表单。 附加的,已经添加到PDF表单中的文本,只要字段在表单顶部自动生成,就会向下流动。 根据我的发现

  • 我有 我怎样才能从(x,y)得到一个7x10的矩阵,但每一行都是从前面的一行加1得到的?例如,第一行是 0,1,2,3,4,5,6 第二排 1,2,3,4,5,6,7 等等

  • 我正在使用眨眼计划程序。这是我的 sql test_table是Kafka桌 我设置了表.exec.state.ttl=10000 并运行我的程序,然后我继续发送消息。 由于我将状态ttl和cep interval都设置为10s,当我启动它时,状态的大小在10秒后应该是一个固定的数字。 但事实是,该州至少持续增长15分钟。此外,jvm触发了两次完整的gc。 是否有我尚未配置的配置

  • 问题内容: 我正在尝试在MySQL中生成序列表,以便可以从获得唯一ID 。 问题是我需要动态地多个序列。 首先,我创建了一个表: 然后尝试使用http://dev.mysql.com/doc/refman/5.0/en/information- functions.html#function_last-insert- id中的 示例获取编号 一段时间后,我意识到我还需要安全地为新标签生成行。因此,