我想将一系列尝试加入到一个静态的阻止电子邮件列表中,并按IP对结果进行分组,以便稍后统计一组相关的统计数据。结果应在每10秒后以30分钟的滑动窗口交付。以下是我尝试实现这一目标的几种方法之一:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
这使用下面的用户定义的表函数,该函数已在my tableEnv中注册为BlockedEmailList:
public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;
public BlockedEmailsList(Collection<String> emails) {
this.emails = emails;
}
public Row read(String email) {
return Row.of(email);
}
public void eval() {
this.emails.forEach(email -> collect(read(email)));
}
}
但是,它返回以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
如果我按照建议执行并将创建的时间戳转换为时间戳,我会得到以下结果:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
我在这里发现了与这些异常相关的堆栈溢出问题,但它们涉及流和时态表,没有一个解决将流连接到静态列表的问题。
有什么想法吗?
编辑:我的用例的Flink项目中似乎有一个未解决的问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17DataStream API的侧面输入
所以,我也接受变通建议。
我设法实现了一个解决我的问题的解决方法!
我没有将流式尝试与静态电子邮件列表连接起来,而是预先将每次尝试映射到一个新的尝试,并添加了一个blockedEmail属性。如果静态列表包含当前尝试的电子邮件,我将其属性设置为true。
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
@Override
public Attempt map(Attempt attempt) throws Exception {
if (blockedEmails.contains(attempt.getEmail())) {
attempt.setBlockedEmail(true);
}
return attempt;
}
});
静态列表block edEmails
的类型为HashSet
,因此查找将是O(1)。
最后,分组查询被调整为:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"WHERE Attempts.email <> '' " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
到目前为止,流和静态列表之间的连接问题似乎尚未解决,但在我的案例中,上述解决方案很好地解决了这个问题。
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
原因是横向表函数是Flink正则联接,正则联接将发送空值,例如
left:(K0, A), right(K1, T1) => send (K0, A, NULL, NULL)
left: , right(K0, T2) => retract (K0, A, NULL, NULL )
send (K0, A, K0, T2)
因此,输入流的时间属性在连接后会丢失。
在您的情况下,您不需要TableFunction,您可以使用标量函数,例如:
public static class BlockedEmailFunction extends ScalarFunction {
private static List<String> blockedEmails = ...;
public Boolean eval(String email) {
return blockedEmails.contains(attempt.getEmail());
}
}
// register function
env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);
// call registered function in SQL and do window operation as your expected
env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");
今天,我想讨论一个关于Flink的概念性话题,而不是一个技术性话题。 在我们的例子中,我们确实有两个Kafka主题A和B,需要连接。连接应该始终包括主题A中的所有元素,以及主题B中的所有新元素。实现这一点有两种可能:始终创建一个新的使用者并从一开始就开始使用主题A,或者在使用后将主题A中的所有元素保持在一个状态内。现在,技术方法是通过连接两个数据流,这很快就向我们展示了它在这个用例中的局限性,因为
我正在评估Apache Flink的流处理,作为Apache Spark的替代品/补充。我们通常使用Spark解决的任务之一是数据扩充。 也就是说,我有来自物联网传感器的带有传感器ID的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量传感器元数据流。 在星火中,我可以和RDD一起加入数据流。 我可以用Apache Flink做同样的技巧吗?我在这方面没有看到直接的API。我唯一的想法
我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?
问题内容: 我有一个列表,例如:thing1,thing2,thing3。我想将它们插入具有相同外键的查找表中。因此理想情况下,它看起来应该像这样: 看来完成此操作的唯一方法是将列表转换为查询,但是我想知道,是否有更简单的方法? 这是我尝试过的: 我听说您无法在cfquery中执行cfloop,但是我什至不确定这是否成立,因为VALUES中没有逗号,并且我不确定如何说“ cfloop中的“当前迭代
问题内容: 最新的()利用TypeScript 2.1中添加的功能,即。这是一件好事,因为现在的输入是正确的,因为在更新之前,“不知道”的输入是合并的,而不是替换它。 同样,使用使得功能在允许输入方面非常严格。无法再将未在组件定义中定义的属性添加到中(的第二个泛型)。 但是,也很难定义动态更新处理程序。例如: 该函数将引发以下错误 即使类型是。 我无法找到一个解决方案,比拥有一个单独的其他和功能。
我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端