我正在为一个项目试验Apache Flink。我正在使用 Flink 来聚合一系列传感器捕获的环境数据。为了计算空气质量指数,我正在尝试实现一个自定义聚合函数,以便在带有窗口的分组选择中使用,但我对类型提示有问题。下面是带有 DataTypeHint 注释的函数代码:
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT"), @DataTypeHint("INT")},
accumulator = @DataTypeHint("AQIAccumulator"),
output = @DataTypeHint("INT")
)
public class AQI extends AggregateFunction<Integer, AQIAccumulator> {
@Override
public AQIAccumulator createAccumulator() {
return new AQIAccumulator();
}
@Override
public Integer getValue(AQIAccumulator acc) {
return 100;
}
public void accumulate(AQIAccumulator acc, int pm10, int pm25, int co) {
System.out.println("PM10: " + pm10 + ", PM25: " + pm25 + ", CO: " + co);
}
public void retract(AQIAccumulator acc, int pm10, int pm25, int co) {
}
public void merge(AQIAccumulator acc, Iterable<AQIAccumulator> it) {
}
}
但我有以下例外:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.innovaway.seneca.jobs.envdata.functions.AQI'. Please check for implementation mistakes and/or provide a corresponding hint.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:211)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:121)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:270)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:783)
at com.innovaway.seneca.jobs.envdata.EnvironmentDataAggregationJob.main(EnvironmentDataAggregationJob.java:88)
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Error in function hint annotation.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:66)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.types.extraction.TemplateUtils.asFunctionTemplates(TemplateUtils.java:69)
at org.apache.flink.table.types.extraction.TemplateUtils.extractGlobalFunctionTemplates(TemplateUtils.java:46)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:151)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Error in data type hint annotation.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:86)
at org.apache.flink.table.types.extraction.FunctionTemplate.fromAnnotation(FunctionTemplate.java:72)
at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:64)
... 40 more
Caused by: org.apache.flink.table.api.TableException: User-defined types are not supported yet.
at org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)
at org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)
at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)
at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)
at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
at org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)
at org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)
at org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)
at org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)
at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:84)
... 42 more
Process finished with exit code 1
我做错了什么?
数据类型提示的字符串版本仅适用于SQL类型。对于POJO和其他类,您可以使用@DataTypeHint(bridge gedTo=AQIAccumulator.class)
。
或者,您可以简单地重写< code>getTypeInference并以编程方式提供所有组件。
但对于您的示例,Flink应该足够聪明,可以使用反射自动派生所有类型。无需提示。
问题内容: 我正在尝试在Oracle中编写一个自定义聚合函数,并将该函数与其他一些函数一起分组在一个包中。作为一个示例(为了模拟我遇到的问题),假设我的自定义聚合对数字进行求和看起来像: 如果我编写以下函数定义: 和相应的类型声明进行测试: 这个说法: 给出正确的结果70。但是,使用函数定义创建一个包: 并通过以下方式调用: 与爆炸 是否可以在包声明中嵌套自定义聚合函数? 问题答案: Oracle
我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些
我正在研究MongoOperations的聚合函数,以便使用Spring数据进行某种中间层查询。正如aggregate()函数的文档中所定义的:http://docs.spring.io/spring-data/mongoDB/docs/current/api/org/springframework/data/mongoDB/core/mongooperations.html#aggregate-
我有一个处理器,它从主题中获取json字符串,类型为GenericRecord。现在我把这条河分成两条支流。我采用第一个分支,并将(key,value)映射为2个字符串,其中包含一个特定的json字段和该字段的值,然后按key分组。到目前为止,一切都很好。现在,我必须用用户定义的新类型聚合流,并收到一个异常。 这里是代码: 新类型: 好流: 问题是: 这是例外: 我如何解决这个问题? 更新 ---
对于Cassandra中的用户定义聚合函数,什么可以作为INITCOND?我只见过具有简单类型(例如元组)的示例。 我为聚合函数中的状态对象提供了以下类型: 当我省略INITCOND时,我得到一个JavaNullPointerException。
问题内容: 我想在当前的Python 3.5项目中使用类型提示。我的函数应该接收一个函数作为参数。 如何在类型提示中指定类型函数? 我检查了PEP 483,但在那里找不到函数类型提示。 问题答案: 正如@jonrsharpe在评论中指出的,可以使用以下方法完成: 问题是,将其本身翻译为: 一个Callable接受 任意数量的/类型的 参数,并返回任何类型的值。在大多数情况下,这不是您想要的,因为您