当前位置: 首页 > 知识库问答 >
问题:

你能用泛型类型实现Flink的AggregateFunction吗?

柳胡媚
2023-03-14

我的目标是为Flink 1.10中的流处理模块提供一个接口。管道包含一个AggregateFunction和其他操作符。所有运算符都有泛型类型,但问题在于AggregateFunction,它无法确定输出类型。

注意:实际的管道有一个滑动EventTimeWindow分配器和一个与AglogFunction一起传递的Window函数,但是使用下面的代码可以更容易地重现错误。

这是一个重现错误的简单测试用例:

    @Test
    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));

        ConfigAPI cfg = new ConfigAPI();

        source
                .keyBy(k -> k.f0)
                .countWindow(5, 1)
                .aggregate(new GenericAggregateFunc<>(cfg))
                .print();


        env.execute();
    }

如您所见,配置类作为参数传递给自定义聚合函数。这是用户将要实现的内容。

    public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
        @Override
        public Tuple2<String, Integer> createAcc() {
            return new Tuple2<>("0", 0);
        }

        @Override
        public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
            acc.f1 += in.f1;
            return acc;
        }
    }

提供的接口是:

    public interface BaseConfigAPI<In, Acc> {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override
    }

通用聚合函数:

    public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

        private BaseConfigAPI<In, Acc> cfg;
        GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
            this.cfg = cfg;
        }
        @Override
        public Acc createAccumulator() {
            return cfg.createAcc();
        }
        @Override
        public Acc add(In in, Acc acc) {
            return cfg.addAccumulators(in, acc);
        }
        @Override
        public Acc getResult(Acc acc) {
            return acc;
        }
        @Override
        public Acc merge(Acc acc, Acc acc1) {
            return null;
        }
    }

输出日志:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

解决方案1(不起作用):起初我以为这是“无法确定返回类型”的常见情况,所以我尝试添加

返回(Types.TUPLE(Types.STRING,Types.INT)),但没有成功。

解决方案 2(工作):我创建了一个具有泛型类型的包装类,名为累加器

这看起来不太优雅,而且与界面的其余部分不太一致。这个问题还有别的解决方法吗?

编辑:感谢@deduper的时间和洞察力,我想我找到了一个解决方案。

解决方案3(工作):我创建了一个新接口,它以以下方式扩展了我的BaseConfigAPIAggregate Function

public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
    //These will be implemented directly from AggregateFunction interface
    //Acc createAcc();
    //Acc addAccumulators(In in, Acc acc);
        
    //other methods to override
}

现在,用户只需实现合并配置API

更新:我用框架测试了@deduper的第三个解决方案,它也不工作。似乎异常是由< code>Acc而不是< code>Out类型引发的。仔细看看< code >的内部。aggregate运算符,我意识到有一个重载的< code>aggregate方法多接受两个参数。<代码>类型信息

这就是最简单的解决方案如何在没有任何代码重构的情况下出现的。

解决方案4(工作):

 @Test
 public void aggregateFunction_genericType() throws Exception {
                ...

                .aggregate(
                        new GenericAggregateFunc<>(cfg), 
                        Types.TUPLE(Types.STRING, Types.INT),
                        Types.TUPLE(Types.STRING, Types.INT))
                ...
    }

注:从Flink 1.10.1开始,聚合方法用@PublicEvolving注释。

共有1个答案

卢元龙
2023-03-14

"你能用泛型类型实现Flink的聚合函数吗?"

对你可以。就像你自己已经做的那样。您的错误是由于您如何使用它(如“使用站点泛型”),而不是您如何实现它。

“是否有其他解决方案?”?。。。“

我以简单的升序提出了以下三个候选解决方案...

...
source
       .keyBy(k -> k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
       .print();
...

以上是最简单的,因为您不必重构原始的GenericAgregateFunc;只需在菱形中填入要实例化泛型类的特定类型参数。

还有一个稍微不那么简单的解决方案...

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;
    GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
        this.cfg = cfg;
    }
    @Override
    public Tuple2<String, Integer> createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
        return acc;
    }
    @Override
    public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
        return null;
    }
}

虽然这一次只涉及一次小的重构,但在我看来,它比第一次提出的解决方案更简化了整个应用程序。

Flink已经为您处理了“复杂”的泛型多态性。要插入Flink,您只需实例化其内置的泛型<代码>聚合函数即可

因此,您仍然在第二种解决方案中“使用泛型”,但您使用的方式要简单得多。

另一个选项更接近您的原始实现,但需要一些小的重构…

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
    
    ...
    @Override
    public Out getResult(Acc acc) {
        return ...;
    }
    ...
}

此外,为了强制用户的配置实现与您的函数兼容的接口,先决条件是…

public interface BaseConfigAPI< In, Acc, Out >{ ... }

在我的实验中,我已经证实,将<code>Out</code>类型参数也添加到<code>BaseConfigAPI</code>中,可以使其兼容。

我确实想到了一个更复杂的替代方案。但是由于越简单越好,我将把更复杂的解决方案留给其他人来提出。

 类似资料:
  • 编译时,此代码将产生以下+错误: 问题是访问中的类型T与列表中的T不相同。如何修复此编译问题?

  • 我被分配了一个问题:编写一个通用的加权元素 到目前为止,我已经创建了类并实现了Compariable,但在为W创建compareTo()方法时遇到了问题。我有: 我遇到的问题是,当我比较权重时,没有找到数据的权重。还有没有其他我必须创建的方法来正确地拥有一个在其中一个变量上实现可比较的类?谢谢你的帮助

  • 问题内容: 我的问题是这样的: 为什么不能使用类Class的new T()和newInstance()实例化泛型? 问题答案: 您需要使用反射(),因为在编译时需要链接其构造函数的类是未知的。因此,编译器无法生成链接。

  • 我试图编写一个函数来返回

  • 和函数类似,实现(implementation)也需要关注保持泛型。(原文:Similar to functions, implementations require care to remain generic.) struct S; // 具体类型 `S` struct GenericVal<T>(T,); // 泛型类型 `GenericVal` // GenericVal 的实现,此处我们