当前位置: 首页 > 知识库问答 >
问题:

Apache Flink中的事件时间临时连接仅适用于小型数据集

鲜于璞瑜
2023-03-14

背景:我正在尝试使用从CSV文件读取的两个“大(r)”数据集/表(左表中16K行,右表中少一些)进行事件时临时联接。这两个表都是仅附加的表,即它们的数据源当前是CSV文件,但将成为Debezium在Pulsar上发出的CDC变更日志。我使用的是相当新的语法SYSTEM\u TIME。

问题:连接结果只有部分正确,即在执行查询的开始(前20%左右),左侧的行与右侧的行不匹配,而理论上应该匹配。几秒钟后,有更多匹配,当查询结束时,左侧的行与右侧的行正确匹配/连接。每次我运行查询时,它都会显示其他结果,显示哪些行(不)匹配。

两个数据集都不是按各自的事件时间排序的。它们是按主键排序的。所以这确实是这种情况,只是有更多的数据。

本质上,右侧是一个随时间变化的查找表,我们可以确定,对于每个左侧记录,都有一个匹配的右侧记录,因为这两个记录都是在/同时在原始数据库中创建的。最终,我们的目标是创建一个动态物化视图,该视图包含的数据与加入支持CDC的源数据库(SQL Server)中的两个表时的数据相同。

显然,我希望在完整的数据集上实现正确的连接,如Flink docs
中所述与简单的示例和Flink测试代码不同,只有几行的小数据集(如此处),较大数据集的连接不会产生正确的结果。

我怀疑,当探测/左表开始流动时,构建/右表还没有“在内存中”,这意味着左行找不到匹配的右行,而它们应该找到——如果右表早一些开始流动的话。这就是为什么左联接为右表的列返回空值的原因。

我已经包括了我的代码:

java prettyprint-override">@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {

    private final String emr01Ddl =
            "CREATE TABLE EMR01\n" +
                    "(\n" +
                    "    SRC_NO         STRING,\n" +
                    "    JRD_ETT_NO     STRING,\n" +
                    "    STT_DT         DATE,\n" +
                    "    MGT_SLT_DT     DATE,\n" +
                    "    ATM_CRT_DT     DATE,\n" +
                    "    LTD_MDT_IC     STRING,\n" +
                    "    CPN_ORG_NO     STRING,\n" +
                    "    PTY_NO         STRING,\n" +
                    "    REG_USER_CD    STRING,\n" +
                    "    REG_TS         TIMESTAMP,\n" +
                    "    MUT_USER_CD    STRING,\n" +
                    "    MUT_TS         TIMESTAMP(3),\n" +
                    "    WATERMARK FOR MUT_TS AS MUT_TS,\n" +
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    "   'connector' = 'filesystem',\n" +
                    "   'path' = '" + getCsv1() + "',\n" +
                    "   'format' = 'csv'\n" +
                    ")";

    private final String emr02Ddl =
            "CREATE TABLE EMR02\n" +
                    "(\n" +
                    "    CPN_ORG_NO  STRING,\n" +
                    "    DSB_TX      STRING,\n" +
                    "    REG_USER_CD STRING,\n" +
                    "    REG_TS      TIMESTAMP,\n" +
                    "    MUT_USER_CD STRING,\n" +
                    "    MUT_TS      TIMESTAMP(3),\n" +
                    "    WATERMARK FOR MUT_TS AS MUT_TS,\n" +
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    "   'connector' = 'filesystem',\n" +
                    "   'path' = '" + getCsv2() + "',\n" +
                    "   'format' = 'csv'\n" +
                    ")";

    @Test
    public void testEventTimeTemporalJoin() throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();
        var tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(emr01Ddl);
        tableEnv.executeSql(emr02Ddl);

        Table result = tableEnv.sqlQuery("" +
                "SELECT *" +
                "   FROM EMR01" +
                "   LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS" +
                "       ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");

        tableEnv.toChangelogStream(result).addSink(new TestSink());
        env.execute();

        System.out.println("[Count]" + TestSink.values.size());
        //System.out.println("[Row 1]" + TestSink.values.get(0));
        //System.out.println("[Row 2]" + TestSink.values.get(1));
        AtomicInteger i = new AtomicInteger();
        TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " + i.incrementAndGet() + " ]=" + value));
    }

    private static class TestSink implements SinkFunction<Row> {

        // must be static
        public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Row value, SinkFunction.Context context) {
            values.add(value);
        }
    }

    String getCsv1() {
        try {
            return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    String getCsv2() {
        try {
            return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

有没有办法解决这个问题?例如。有没有办法首先将右侧加载到Flink状态,然后开始加载/流式传输左侧?这是一个好方法吗,因为这个问题需要:多久以后?左侧可以开始流动的时间是什么时候?

我们使用的是Flink 1.13.3。

共有1个答案

郗福
2023-03-14

这种时间/版本连接依赖于精确的水印。Flink依赖于水印来知道哪些行可以安全地从正在维护的状态中删除(因为它们不再影响结果)。

您使用的水印表示行是按MUT\TS排序的。因为这不是真的,所以联接无法生成完整的结果。

为了解决这个问题,水印应该用这样的东西来定义

WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE

其中,间隔表示需要容纳多少无序情况。

 类似资料:
  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 问题内容: 在这里使用新的logstash jdbc连接器: https://www.elastic.co/guide/zh-CN/logstash/current/plugins-inputs- jdbc.html 后续logstash运行如何影响已经编入ElasticSearch的内容?是在ES索引中创建新文档,还是更新与已经被索引的行匹配的文档?我尝试解决的用例是将带有时间戳的行索引到ela

  • 你可以创建一个 ODBC 连接来连接你的 ODBC 数据源。下列的指示引导你通过创建一个新 ODBC 连接的程序。 设置一个 ODBC 数据源连接 若要设置数据源的连接,你要安装相关的驱动程序。 然后,使用 ODBC 管理员来设置 DSN(数据源名)。 【注意】你可以向驱动程序提供者咨询有关如何设置数据源名。 在 Navicat Data Modeler 连接到 ODBC 数据源 连接名 一个最恰

  • 你可以创建一个 ODBC 连接来连接你的 ODBC 数据源。下列的指示引导你通过创建一个新 ODBC 连接的进程。 设置一个 ODBC 数据源连接 在控制面板,选择“管理工具”。 选择“ODBC 数据源”。 选择“用户 DSN”选项卡。 点击“添加”。 选择合适的 ODBC 驱动程序并点击“完成”。 输入所需的信息。 选择“确定”来创建你的 ODBC 驱动程序到列表。 在 Navicat Data

  • 你可以创建一个连接来连接到你的服务器:MySQL、MariaDB、PostgreSQL、Oracle、SQLite、SQL Server。 【注意】连接属性的选项卡会根据你所选择的数据库类型而有所不同。 常规设置 高级设置 数据库设置 SSL 设置 SSH 设置 HTTP 设置

  • 你可以创建一个连接来连接到你的服务器:MySQL、MariaDB、PostgreSQL、Oracle、SQLite、SQL Server。 【注意】连接属性的选项卡会根据你所选择的数据库类型而有所不同。 常规设置 高级设置 数据库设置 SSL 设置 SSH 设置 HTTP 设置