当前位置: 首页 > 知识库问答 >
问题:

Flink:Flink是否支持可以处理具有公共字段的不同数据流的抽象运算符?

席言
2023-03-14

假设我们有多个数据流,它们共享一些共同的特性。

例如,我们有一个教师流和一个学生流,它们都有一个年龄字段。如果我想从实时流中找出最大的学生或老师,我可以实现一个运算符,如下所示。

public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
    int maxAge;

    @Override
    public void flatMap(Student s, Collector<Integer> collector) throws Exception {
        if(s.age > maxAge){
            maxAge = s.age;
        }
        collector.collect(maxAge);
    }
}

为了找出最年长的老师,我们需要实现一个类似的操作符,如下所示

public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
    int maxAge;

    @Override
    public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = t.age;
        }
        collector.collect(maxAge);
    }
}

但实际上这两个操作符有共同的流程逻辑,所以我的想法是定义一个父类,比如People。

public class People{
    public Integer age;
}

然后学生和教师可以定义为他们的子类,也可以保留自己的字段。

public class Student extends People {
    public Integer grade;  // student grade
    ...
}
public class Student extends People {
    public Integer subject;  // the subject that teacher teaches
    ...
}

在这种情况下,我可以定义一个操作符,如下所示。

public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
    int maxAge;

    @Override
    public void flatMap(People p, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = p.age;
        }
        collector.collect(maxAge);
    }
}

但是,当我尝试使用此操作符实现Flink执行拓扑时,由于数据类型不匹配,它将无法工作。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);

studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();

这就是我的问题,有没有可能为具有公共字段的输入流创建一个抽象运算符?

共有1个答案

吕飞翼
2023-03-14

这比Flink问题更Java:

您要做的是将MaxiumAgeFunc参数化如下

public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
    int maxAge;

    @Override
    public void flatMap(T p, Collector<Integer> collector) throws Exception {
        if(t.age > maxAge){
            maxAge = p.age;
        }
        collector.collect(maxAge);
    }
}

然后像这样使用它

studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();

编辑:

顺便说一句,您的函数没有使用检查点(因此从检查点恢复时会产生错误的结果),我宁愿使用全局窗口上的聚合函数。

students
    .windowAll(GlobalWindows.create())
    .aggregate(new AggregateFunction<People, Integer, Integer>() {
        @Override
        public Integer createAccumulator() {
            return -1;
        }

        @Override
        public Integer add(People value, Integer accumulator) {
            return Math.max(value.age, accumulator);
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return Math.max(a, b);
        }
    });
 类似资料:
  • 问题内容: 无法通过直接调用类的构造函数来创建对象。只能从派生类调用类的构造函数。因此,在我 看来 ,抽象类的构造函数必须是要么私有的,要么是私有程序包的(对于非常规情况,后者是限制将构造函数的使用限制为封装内的派生类)。但是Java允许类的构造函数为。 在任何情况下,将类的构造函数声明为而不是package-private 有用 吗? 这与“ 抽象类构造函数访问修饰符 ” 问题不是很重复:显然,

  • 问题内容: 在我多年的编程工作中,我经常创建一些类,这些类仅将一些变量与其设置器和获取器组合在一起。我已经看到了这些类型的对象,这些对象称为值对象,域对象或模型对象,具体取决于使用它们的上下文。通用用法最合适的术语似乎是数据传输对象(DTO)。这描述了仅包含访问器和更改器的POJO。 我刚刚编写了一个这样的对象,其中包含大约五十个用于在图表上设置主题参数的字段。现在,我想知道是否应该将这些字段声明

  • 问题内容: 假设我通过创建共享对象并使用LD_PRELOAD首先加载它来替换函数。是否有可能使该功能的参数不同于原始库中的参数? 例如,如果我替换 pthread_mutex_lock ,这样它将代替参数 pthread_mutex_t 而是使用 pthread_my_mutex_t 。可能吗? 其次,除了函数之外,是否可以使用LD_PRELOAD更改结构声明?例如,可以向结构中增加一个字段。 问

  • H全部, 如果有人有任何经验的kafka-spark流对处理各种数据,请给我一个简短的细节,如果这是一个可行的解决方案,并比有两个不同的管道更好。 提前道谢!

  • 在我的搜索引擎中,用户可以选择搜索大小写敏感或不敏感。如果他们选择这样做,查询将搜索使用自定义区分大小写分析器的字段。这是我的设置: 获取/candidates/_settings 因此,我从这个答案中创建了一个名为的自定义分析器。我试图定义我的,如下所示: 把/candidates/_mapping/candidate 因此,在查询区分大小写的匹配时,我可以执行以下操作: 我甚至没有进入最后一步

  • 我有三个数据帧,并试图计算由数据帧1调节的两个数据帧(Df2和Df3)之间的差值。如下面的示例所解释的,我有三个数据帧,Df1、Df2和Df3,它们具有共同的名称。第一步,在Df1中,我想比较“Standard”列和所有三列的值,“das”、“dss”和“tri”可能是行的,并且当这些列的任何值,“das”、“dss”和“tri”高于Df1中的“Standard”时,计算Df2和Df3中相同位置的