背景:我正在尝试使用从CSV文件读取的两个“大(r)”数据集/表(左表中16K行,右表中少一些)进行事件时临时联接。这两个表都是仅附加的表,即它们的数据源当前是CSV文件,但将成为Debezium在Pulsar上发出的CDC变更日志。我使用的是相当新的语法SYSTEM\u TIME。
问题:连接结果只有部分正确,即在执行查询的开始(前20%左右),左侧的行与右侧的行不匹配,而理论上应该匹配。几秒钟后,有更多匹配,当查询结束时,左侧的行与右侧的行正确匹配/连接。每次我运行查询时,它都会显示其他结果,显示哪些行(不)匹配。
两个数据集都不是按各自的事件时间排序的。它们是按主键排序的。所以这确实是这种情况,只是有更多的数据。
本质上,右侧是一个随时间变化的查找表,我们可以确定,对于每个左侧记录,都有一个匹配的右侧记录,因为这两个记录都是在/同时在原始数据库中创建的。最终,我们的目标是创建一个动态物化视图,该视图包含的数据与加入支持CDC的源数据库(SQL Server)中的两个表时的数据相同。
显然,我希望在完整的数据集上实现正确的连接,如Flink docs
中所述与简单的示例和Flink测试代码不同,只有几行的小数据集(如此处),较大数据集的连接不会产生正确的结果。
我怀疑,当探测/左表开始流动时,构建/右表还没有“在内存中”,这意味着左行找不到匹配的右行,而它们应该找到——如果右表早一些开始流动的话。这就是为什么左联接为右表的列返回空值的原因。
我已经包括了我的代码:
java prettyprint-override">@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {
private final String emr01Ddl =
"CREATE TABLE EMR01\n" +
"(\n" +
" SRC_NO STRING,\n" +
" JRD_ETT_NO STRING,\n" +
" STT_DT DATE,\n" +
" MGT_SLT_DT DATE,\n" +
" ATM_CRT_DT DATE,\n" +
" LTD_MDT_IC STRING,\n" +
" CPN_ORG_NO STRING,\n" +
" PTY_NO STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv1() + "',\n" +
" 'format' = 'csv'\n" +
")";
private final String emr02Ddl =
"CREATE TABLE EMR02\n" +
"(\n" +
" CPN_ORG_NO STRING,\n" +
" DSB_TX STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv2() + "',\n" +
" 'format' = 'csv'\n" +
")";
@Test
public void testEventTimeTemporalJoin() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(emr01Ddl);
tableEnv.executeSql(emr02Ddl);
Table result = tableEnv.sqlQuery("" +
"SELECT *" +
" FROM EMR01" +
" LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS" +
" ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");
tableEnv.toChangelogStream(result).addSink(new TestSink());
env.execute();
System.out.println("[Count]" + TestSink.values.size());
//System.out.println("[Row 1]" + TestSink.values.get(0));
//System.out.println("[Row 2]" + TestSink.values.get(1));
AtomicInteger i = new AtomicInteger();
TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " + i.incrementAndGet() + " ]=" + value));
}
private static class TestSink implements SinkFunction<Row> {
// must be static
public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Row value, SinkFunction.Context context) {
values.add(value);
}
}
String getCsv1() {
try {
return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String getCsv2() {
try {
return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
有没有办法解决这个问题?例如。有没有办法首先将右侧加载到Flink状态,然后开始加载/流式传输左侧?这是一个好方法吗,因为这个问题需要:多久以后?左侧可以开始流动的时间是什么时候?
我们使用的是Flink 1.13.3。
这种时间/版本连接依赖于精确的水印。Flink依赖于水印来知道哪些行可以安全地从正在维护的状态中删除(因为它们不再影响结果)。
您使用的水印表示行是按MUT\TS排序的。因为这不是真的,所以联接无法生成完整的结果。
为了解决这个问题,水印应该用这样的东西来定义
WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
其中,间隔表示需要容纳多少无序情况。
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想
问题内容: 在这里使用新的logstash jdbc连接器: https://www.elastic.co/guide/zh-CN/logstash/current/plugins-inputs- jdbc.html 后续logstash运行如何影响已经编入ElasticSearch的内容?是在ES索引中创建新文档,还是更新与已经被索引的行匹配的文档?我尝试解决的用例是将带有时间戳的行索引到ela
你可以创建一个 ODBC 连接来连接你的 ODBC 数据源。下列的指示引导你通过创建一个新 ODBC 连接的程序。 设置一个 ODBC 数据源连接 若要设置数据源的连接,你要安装相关的驱动程序。 然后,使用 ODBC 管理员来设置 DSN(数据源名)。 【注意】你可以向驱动程序提供者咨询有关如何设置数据源名。 在 Navicat Data Modeler 连接到 ODBC 数据源 连接名 一个最恰
你可以创建一个 ODBC 连接来连接你的 ODBC 数据源。下列的指示引导你通过创建一个新 ODBC 连接的进程。 设置一个 ODBC 数据源连接 在控制面板,选择“管理工具”。 选择“ODBC 数据源”。 选择“用户 DSN”选项卡。 点击“添加”。 选择合适的 ODBC 驱动程序并点击“完成”。 输入所需的信息。 选择“确定”来创建你的 ODBC 驱动程序到列表。 在 Navicat Data
你可以创建一个连接来连接到你的服务器:MySQL、MariaDB、PostgreSQL、Oracle、SQLite、SQL Server。 【注意】连接属性的选项卡会根据你所选择的数据库类型而有所不同。 常规设置 高级设置 数据库设置 SSL 设置 SSH 设置 HTTP 设置
你可以创建一个连接来连接到你的服务器:MySQL、MariaDB、PostgreSQL、Oracle、SQLite、SQL Server。 【注意】连接属性的选项卡会根据你所选择的数据库类型而有所不同。 常规设置 高级设置 数据库设置 SSL 设置 SSH 设置 HTTP 设置