我正在为使用match_recognize的Flink SQL语句编写单元测试。我正在这样设置测试数据
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
我有两个问题,
FLINK版本:1.11
您遇到了Table API的当前限制:无法与forValue
方法结合定义水印和行时属性;您需要一个连接器。有几个选项可以解决它:
1.使用< code>csv连接器,它与您的< code >值堆叠在一起,如下例所示。
2. 使用内置的数据生成连接器。由于您正在为CEP进行单元测试,因此我想您希望对生成的数据进行一定程度的控制,因此这可能不是一个可行的选择。以为我会提到它,无论如何。
注意:建议使用SQL DDL语法从Flink 1.10创建表。这将使您尝试做的两件事(即定义水印和命名表)更简单:
tEnv.executeSql("CREATE TABLE table_name (\n" +
" event_time TIMESTAMP(3),\n" +
" foobar STRING \n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (...)"
);
Table data = tEnv.from("table_name");
水印被声明为计算列,您可以选择使用多个水印策略。有关更多详细信息,请查看此文档页面。
我正在测试flink-cep-sql,我的水印定义为行时间,我的表是kafka表。由于水印依赖于所有kafka分区中的最小值,所以每个新消息都必须等待kafka划分对齐,然后cep触发结果。 我的kafka表(主题有3个分区)定义为 这是我的cep sql cep sql触发结果是正确的,但总是迟到,因为每个分区都需要对齐水印。如何立即获得最新结果或在flink sql表中自动生成水印?
我正在尝试使用flink,以流式和批处理的方式,将大量数据添加到Accumulo中(每分钟几百万个)。我想在将记录发送到accumulo之前对其进行批处理。我从目录或通过kafka摄取数据,使用flatmap转换数据,然后传递给RichSinkFunction,它将数据添加到集合中。 对于流数据,批处理似乎可以,因为我可以将记录添加到固定大小的集合中,一旦达到批处理阈值,这些记录就被发送到accu
我刚刚将我的项目从angular-cli beta.10升级到angular-cli@WebPack(beta.18),现在我的specs.ts都没有解析(找不到名称'description'等)。有关于如何为我的项目配置单元测试的信息吗?
问题内容: 我有一个接受查询,通过SQL服务器运行它,检查错误并返回结果的类。如何对该课程进行单元测试? 编辑 :我将尝试更精确: 负责将查询传递到服务器。为了测试它是否确实这样做,抛出正确的异常等,我想将其连接到将要填充的模拟数据库。我的问题是- 怎么做?如何创建一个模拟“服务器”来处理呼叫? 问题答案: 只需传递一个SQL查询,然后将返回的结果与预期的结果进行比较即可。简单的。JUnit是一个
对于php开发者来说,做单元测试并不是很方便,这也是大部分phper不愿意写单元测试的原因。但是单元测试对提高程序的质量来说,有实在是太重要的了。 有关这一点,我也是在学习java,体验过单元测试的魅力之后才切身体会到的。 那么现在我们就以herosphp为栗子,一起来看看php的单元测试环境如何配置。 首先,毫无疑问,php的单元测试肯定是要依赖phpunit这个工具了。那么我们要先 安装php
单元测试,对独立的代码功能片段,由编写代码的团队进行测试,也是一种编码,而非与之不同的一些事情。设计代码的一部分就是设计它该如何被测试。你应该写一个测试计划,即使它只是一句话。有时候测试很简单:“这个按钮看起来好吗?”,有时候它很复杂:“这个匹配算法可以精确地返回正确的匹配结果?”。 无论任何可能的时候,使用断言检查以及测试驱动。这不仅能尽早发现 bug,而且在之后也很有用,让你在其他方面担心的谜