当前位置: 首页 > 知识库问答 >
问题:

flink类型提示中的自定义聚合函数

龚迪
2023-03-14

我正在为一个项目试验Apache Flink。我正在使用 Flink 来聚合一系列传感器捕获的环境数据。为了计算空气质量指数,我正在尝试实现一个自定义聚合函数,以便在带有窗口的分组选择中使用,但我对类型提示有问题。下面是带有 DataTypeHint 注释的函数代码:

@FunctionHint(
        input = {@DataTypeHint("INT"), @DataTypeHint("INT"), @DataTypeHint("INT")},
        accumulator = @DataTypeHint("AQIAccumulator"),
        output = @DataTypeHint("INT")
)
public class AQI extends AggregateFunction<Integer, AQIAccumulator> {

    @Override
    public AQIAccumulator createAccumulator() {
        return new AQIAccumulator();
    }

    @Override
    public Integer getValue(AQIAccumulator acc) {
        return 100;
    }

    public void accumulate(AQIAccumulator acc, int pm10, int pm25, int co) {
        System.out.println("PM10: " + pm10 + ", PM25: " + pm25 + ", CO: " + co);
    }

    public void retract(AQIAccumulator acc, int pm10, int pm25, int co) {

    }

    public void merge(AQIAccumulator acc, Iterable<AQIAccumulator> it) {

    }

}

但我有以下例外:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.innovaway.seneca.jobs.envdata.functions.AQI'. Please check for implementation mistakes and/or provide a corresponding hint.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
    at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:211)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:121)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
    at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
    at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
    at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
    at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
    at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
    at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
    at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:270)
    at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:783)
    at com.innovaway.seneca.jobs.envdata.EnvironmentDataAggregationJob.main(EnvironmentDataAggregationJob.java:88)
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
    ... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Error in function hint annotation.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:66)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.apache.flink.table.types.extraction.TemplateUtils.asFunctionTemplates(TemplateUtils.java:69)
    at org.apache.flink.table.types.extraction.TemplateUtils.extractGlobalFunctionTemplates(TemplateUtils.java:46)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:151)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
    ... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Error in data type hint annotation.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:86)
    at org.apache.flink.table.types.extraction.FunctionTemplate.fromAnnotation(FunctionTemplate.java:72)
    at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:64)
    ... 40 more
Caused by: org.apache.flink.table.api.TableException: User-defined types are not supported yet.
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)
    at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
    at org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)
    at org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)
    at org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)
    at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:84)
    ... 42 more

Process finished with exit code 1

我做错了什么?

共有1个答案

阳勇
2023-03-14

数据类型提示的字符串版本仅适用于SQL类型。对于POJO和其他类,您可以使用@DataTypeHint(bridge gedTo=AQIAccumulator.class)

或者,您可以简单地重写< code>getTypeInference并以编程方式提供所有组件。

但对于您的示例,Flink应该足够聪明,可以使用反射自动派生所有类型。无需提示。

 类似资料:
  • 问题内容: 我正在尝试在Oracle中编写一个自定义聚合函数,并将该函数与其他一些函数一起分组在一个包中。作为一个示例(为了模拟我遇到的问题),假设我的自定义聚合对数字进行求和看起来像: 如果我编写以下函数定义: 和相应的类型声明进行测试: 这个说法: 给出正确的结果70。但是,使用函数定义创建一个包: 并通过以下方式调用: 与爆炸 是否可以在包声明中嵌套自定义聚合函数? 问题答案: Oracle

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

  • 我正在研究MongoOperations的聚合函数,以便使用Spring数据进行某种中间层查询。正如aggregate()函数的文档中所定义的:http://docs.spring.io/spring-data/mongoDB/docs/current/api/org/springframework/data/mongoDB/core/mongooperations.html#aggregate-

  • 我有一个处理器,它从主题中获取json字符串,类型为GenericRecord。现在我把这条河分成两条支流。我采用第一个分支,并将(key,value)映射为2个字符串,其中包含一个特定的json字段和该字段的值,然后按key分组。到目前为止,一切都很好。现在,我必须用用户定义的新类型聚合流,并收到一个异常。 这里是代码: 新类型: 好流: 问题是: 这是例外: 我如何解决这个问题? 更新 ---

  • 问题内容: 我想在当前的Python 3.5项目中使用类型提示。我的函数应该接收一个函数作为参数。 如何在类型提示中指定类型函数? 我检查了PEP 483,但在那里找不到函数类型提示。 问题答案: 正如@jonrsharpe在评论中指出的,可以使用以下方法完成: 问题是,将其本身翻译为: 一个Callable接受 任意数量的/类型的 参数,并返回任何类型的值。在大多数情况下,这不是您想要的,因为您

  • 对于Cassandra中的用户定义聚合函数,什么可以作为INITCOND?我只见过具有简单类型(例如元组)的示例。 我为聚合函数中的状态对象提供了以下类型: 当我省略INITCOND时,我得到一个JavaNullPointerException。