当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:执行环境和多接收器

慕兴平
2023-03-14

我的问题可能会引起一些混乱,所以请先看说明。找出我的问题可能会有帮助。我会在问题的后面添加我的代码(也欢迎任何关于我的代码结构/实现的建议)。感谢您事先提供的任何帮助!

我的问题:

env.execute()有什么用途?我的代码会输出没有这句话的结果。如果我加上这句话,就会出现一个例外:

-

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) 
    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) 
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) 
    at MainClass.main(MainClass.java:114)

描述:编程新手。最近我需要使用Flink批处理来处理一些数据(分组数据、计算标准差等)。然而,我到了一个点,我需要输出两个数据集。结构是这样的

ExecutionEnvironment.getExecutionEnvironment();

将给出正确的索引号(1-10000),所用的时间和数据库连接的数量是不同的,打印的顺序将被颠倒。

OS、DB、其他环境详细信息和版本:IntelliJ IDEA 2017.3.5(Community Edition)Build#ic-173.4674.33,2018年3月6日构建jre:1.8.0_152-release-1024-b15 amd64 jvm:OpenJDK 64位服务器VM由JetBrains S.r.o Windows 10 10.0开发

我的测试代码(Java):

公共静态void main(String[]args)引发异常{ExecutionEnvironment.CreateCollectionsEnvironment();

    //Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.
    BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);

    //Get Data from a mySql database
    DataSet<Row> dbData =
            env.createInput(
                    JDBCInputFormat.buildJDBCInputFormat()
                            .setDrivername("com.mysql.cj.jdbc.Driver")
                            .setDBUrl($database_url)
                            .setQuery("select value from $table_name where id =33")
                            .setUsername("username")
                            .setPassword("password")
                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))
                            .finish()
            );

    // Add index for assigning group (group capacity is 5)
    DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);

    // Replace index(long) with group number(int), and convert Row to double at the same time
    DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());

    //Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group
    //put them into a POJO named GroupDataClass
    DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {
        @Override
        public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {
            Iterator<Tuple2<Integer, Double>> it = iterable.iterator();
            Tuple2<Integer, Double> var1 = it.next();
            int groupNum = var1.f0;

            // Using max and min to calculate range, using i and sum to calculate mean
            double max = var1.f1;
            double min = max;
            double sum = 0;
            int i = 1;

            // The list is to store individual value
            List<Double> list = new ArrayList<>();
            list.add(max);

            while (it.hasNext())
            {
                double next = it.next().f1;
                sum += next;
                i++;
                max = next > max ? next : max;
                min = next < min ? next : min;
                list.add(next);
            }

            //Store group number, mean, range, and 5 individual values within the group
            collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));
        }
    });

    //print because if no sink is created, Flink will not even perform the calculation.
    groupDS.print();


    // Get the max group number and range in each group to calculate average range
    // if group number start with 1 then the maximum of group number equals to the number of group
    // However, because this is the second sink, data will flow from source again, which will double the group number
    DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {
        @Override
        public Tuple2<Integer, Double> map(GroupDataClass in) {
            return new Tuple2<>(in.groupNum, in.range);
        }
    }).max(0).andSum(1);

    // collect and print as if no sink is created, Flink will not even perform the calculation.
    Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);
    double range = rangeTuple.f1/ rangeTuple.f0;
    System.out.println("range = " + range);
}

public static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {
    @Override
    public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {

        // index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.
        int n = new Long(input.f0 / 5).intValue() + 1;
        out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));
    }
}

共有1个答案

柴正祥
2023-03-14

>

  • 可以将一个源连接到多个接收器,源只执行一次,记录广播到多个接收器。看到这个问题可以Flink将结果写入多个文件(像Hadoop的MultipleOutputFormat)吗?

    GetExecutionEnvironment是在您要运行作业时获取环境的正确方法。createCollectionEnvironment是一种很好的玩转和测试的方法。请参阅文档

    异常错误消息非常清楚:如果调用print或collection,则执行数据流。所以您有两个选择:

    • 或者在数据流结束时调用print/collect并执行和打印它。这对测试东西很好。请记住,每个数据流只能调用collectr/print一次,否则会在未完全定义时执行多次
    • 您可以在数据流的末尾添加一个接收器并调用env.execute()。这就是您想要在流处于更成熟的形状时执行的操作。

  •  类似资料:
    • 当代码在运行时,它所在的执行环境非常重要。 执行上下文 在 JavaScript 中,执行上下文与执行环境关系密切,它与函数和变量的声明息息相关,通常认为有两种执行上下文: 全局上下文——代码首次执行的默认环境; 函数上下文——当代码执行进入函数体中。 让我们来看一段包含这几种执行上下文的代码: // 全局上下文​var hello = 'Hello!';​function introduce()

    • 主要内容:编写和执行本章介绍与批处理脚本相关的环境。 编写和执行 通常,要创建批处理文件,可以使用记事本或Ediplus之类的文本编辑器。 这是创建批处理文件的最简单的工具。 接下来是批处理脚本的执行环境。 在Windows系统上,这是通过命令提示符或来完成。 所有批处理文件都在此环境中运行。 以下是启动的两种方法 - 方法1 - 转到并双击文件。如下图所示 - 方法2 - 通过运行命令 - 以下图片显示在Windo

    • 本章主题 ♦ 可调用对象 ♦ 代码对象 ♦ 语句和内置函数 ♦ 执行其他程序 ♦ 终止执行 ♦ 各类操作系统接口 ♦ 相关模块 在Python中有多种运行外部程序的方法,比如,运行操作系统命令或另外的Python脚本,或执行一个磁盘上的文件,或通过网络来运行文件。这完全取决于你想要干什么。有些特定的执行场景包括: 在当前脚本继续运行; 创建和管理子进程; 执行外部命令或程序; 执行需要输入的命令;

    • 执行环境(execution context,为简单起见,有时也称为“环境”)是JavaScript 中最为重要的一个概念。执行环境定义了变量或函数有权访问的其他数据,决定了它们各自的行为。每个执行环境都有一个与之关联的变量对象(variable object),环境中定义的所有变量和函数都保存在这个对象中。虽然我们编写的代码无法访问这个对象,但解析器在处理数据时会在后台使用它。 全局执行环境是最

    • Serverless 与微服务在一点上很吸引人,你可以采用不同的语言来运行你的代码,不同的服务之间可以使用不同的语言。除了,在不同的 Serverless 服务里,采用不同的语言来开发。我们也可以在一个 Serverless 服务里,使用不同的语言来开发服务。 Serverless 多个语言运行环境 这次我们要创建的 Serverless 服务,其实现步骤相当的简单: 使用 serverless

    • 我试图在Quartz 1.6中创建一个,但必须只执行一次,因为我有两个测试实例具有相同版本的.war文件。 这是我的类,将每60秒执行一次: 然后我有我的类来打印一个简单的输出: