当前位置: 首页 > 知识库问答 >
问题:

Flink CEP无法在联合表上获得正确的结果

卫鸿朗
2023-03-14

我使用 Flink SQL 和 CEP 来识别一些非常简单的模式。但是,我发现了一件奇怪的事情(可能是一个错误)。我有两个示例表password_change转移,如下所示。

转移

transid,accountnumber,sortcode,value,channel,eventtime,eventtype
1,123,1,100,ONL,2020-01-01T01:00:01Z,transfer
3,123,1,100,ONL,2020-01-01T01:00:02Z,transfer
4,123,1,200,ONL,2020-01-01T01:00:03Z,transfer
5,456,1,200,ONL,2020-01-01T01:00:04Z,transfer

password_change

accountnumber,channel,eventtime,eventtype
123,ONL,2020-01-01T01:00:05Z,password_change
456,ONL,2020-01-01T01:00:06Z,password_change
123,ONL,2020-01-01T01:00:08Z,password_change
123,ONL,2020-01-01T01:00:09Z,password_change

以下是我的SQL查询。

首先创建一个临时视图事件作为

(SELECT accountnumber,rowtime,eventtype FROM password_change WHERE channel='ONL') 
UNION ALL 
(SELECT accountnumber,rowtime, eventtype FROM transfer WHERE channel = 'ONL' )

rowtime列是直接从原始eventtime列中提取的事件时间,水印周期范围为1秒。

然后输出查询结果

SELECT * FROM `event`
    MATCH_RECOGNIZE ( 
        PARTITION BY accountnumber 
        ORDER BY rowtime 
        MEASURES 
            transfer.eventtype AS event_type,
            transfer.rowtime AS transfer_time
        ONE ROW PER MATCH 
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (transfer password_change )  WITHIN INTERVAL '5' SECOND 
        DEFINE 
            password_change AS eventtype='password_change', 
            transfer AS eventtype='transfer' 
    )

它应该输出

123,transfer,2020-01-01T01:00:03Z
456,transfer,2020-01-01T01:00:04Z

但在运行Flink 1.11.1时,我什么也没有得到(1.10.1也没有输出)。

此外,我将模式更改为仅< code>password_change,它仍然不输出任何内容,但是如果我将模式更改为< code>transfer,它将输出几行,但不是所有的传输行。如果我交换两个表的eventtime,这意味着让password_changes先发生,那么模式< code>password_change将输出几行,而< code>transfer不会。

另一方面,如果我从两个表中提取这些列并手动将它们合并到一个表中,然后将它们发出到Flink中,则运行结果是正确的。

我搜索并尝试了很多,包括更改SQL语句、水印、缓冲区超时等,但没有任何帮助。希望这里的任何人都能帮忙。谢谢

10/10/2020 更新:

我使用 Kafka 作为表源。tEnv 是 StreamTableEnvironment。

Kafka kafka=new Kafka()
            .version("universal")
            .property("bootstrap.servers", "localhost:9092");

tEnv.connect(
            kafka.topic("transfer")
    ).withFormat(
            new Json()
                .failOnMissingField(true)
    ).withSchema(
            new Schema()
                .field("rowtime",DataTypes.TIMESTAMP(3))
                .rowtime(new Rowtime()
                             .timestampsFromField("eventtime")
                             .watermarksPeriodicBounded(1000)
                )
                .field("channel",DataTypes.STRING())
                .field("eventtype",DataTypes.STRING())
                .field("transid",DataTypes.STRING())
                .field("accountnumber",DataTypes.STRING())
                .field("value",DataTypes.DECIMAL(38,18))
    ).createTemporaryTable("transfer");
        

tEnv.connect(
            kafka.topic("pchange")
    ).withFormat(
            new Json()
                .failOnMissingField(true)
    ).withSchema(
            new Schema()
                .field("rowtime",DataTypes.TIMESTAMP(3))
                .rowtime(new Rowtime()
                            .timestampsFromField("eventtime")
                            .watermarksPeriodicBounded(1000)
                )
                .field("channel",DataTypes.STRING())
                .field("accountnumber",DataTypes.STRING())
                .field("eventtype",DataTypes.STRING())
    ).createTemporaryTable("password_change");

感谢@Dawid Wysakowicz的回答。为了确认这一点,我添加4,123,1,200, ONL,2020-01-01T01:00:10Z,将传输到传输表的末尾,然后输出变为正确,这意味着它确实是水印的一些问题。

所以现在的问题是如何修复它。由于用户不会经常更改密码,因此这两个表之间的时间间隔是不可避免的。我只需要UNION ALL表具有与我手动合并的行为相同的行为。

2020年11月4日更新:
具有空闲源的水印策略可能会有所帮助。

共有1个答案

归翔
2023-03-14

最有可能的问题是与UNION ALL操作符一起生成水印。你能分享一下你是如何创建这两个表的吗,包括你是如何定义时间属性的,以及连接符是什么?它可以让我证实我的怀疑。

我认为问题是其中一个源停止发出水印。如果传输表(或时间戳较低的表)没有完成并且没有产生任何记录,它就不会发出水印。在发出第四行之后,它将发出Watermark=3(4-1秒)。输入联合的水印是两个值中最小的。因此,第一个表将暂停/保持具有valueWatermark=3的水印,因此您看不到原始查询的任何进展,并且您会看到为时间戳较小的表发出的一些记录。

如果您手动连接两个表,您只有一个输入和一个水印源,因此它会进一步发展,您会看到一些结果。

 类似资料:
  • 请读到最后(我在最后提到console.log) 模型: 收藏: 观点: 在我们的应用程序中。js 服务器输出 我还尝试从模型中删除所有属性防御。还是不行。返回值内容类型为:application/json(已验证),并且是有效的json。 我读过:Backbonejs集合长度总是零 但尽管console.log,显示0长度,也: 不工作! 我还读了Did主干收集自动解析加载的数据 非常感谢 更新

  • 嗨,伙计们,我的作业真的需要帮助。我试着做它,但是我不能解决它。 起始板: 期望结果: 然而,每次执行时,每当有赢家时,我总是得到以下信息: 最后,如何计算获胜次数最多的玩家的胜率? 限制: > < li> 您必须对井字游戏棋盘使用2D阵列。(我知道我没有使用2D数组,因为我不知道如何让编译器接受来自2D数组的1-9个输入。) 不能对此问题使用类。 如果愿意,可以使用静态方法。

  • 我正在尝试单元测试我的类,它看起来像:- 我想在类B中模拟“method2()”。我知道我们需要有一个B()的mock对象,这样每当我们调用它的方法时,就会发生模拟。这是我试过的 并使用调用它,现在的主要问题是method2被嘲弄了(即method2()的主体没有被执行),但我无法接收C的对象作为响应。 我的测试场景是:- 我想测试类A的method1(),它反过来调用类B的method2(),但

  • 问题内容: 是否有一种简单的方法来获取当前操作系统使用的行尾类型? 问题答案: 如果您对以文本模式打开的文件进行操作,那么正确的是所有换行符都显示为“ ”。否则,您正在寻找。 从http://docs.python.org/library/os.html: os。 Lineep 在当前平台上用于分隔(或终止)行的字符串。这可以是单个字符,例如对于POSIX是’\ n’,也可以是多个字符,例如对于W

  • 我有以下疑问。 ]) 这是查询的结果。 输出1 我将得到一个$组结果。我将使用$array1。 输出 “计数”结果将复制到所有结果上。 但是$组将创建一个糟糕的结构来读取所有数据。 如何将所有OUTPUT1与OUTPUT2的$组“count”合并? 使用$组“count”,但我不想要$组结构。