当前位置: 首页 > 知识库问答 >
问题:

如何实现数据集批处理的第一个元组字段元素的keyBy

陆甫
2023-03-14

我正在尝试将我的应用程序从flink流处理转换为flink批处理。

对于flink数据流,我从包含多个JSON对象的预定义文件中读取字符串,并执行从JSON对象到tuple3收集器的flatmap(第一个元素--来自JSON对象的一个字段,第二个元素--来自JSON对象的另一个字段,第三个元素--实际的JSON对象数据)。

DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });

然后,在窗口中执行元组的first和element的keyBy。

WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource
            .keyBy(0, 1)
            .timeWindow(Time.seconds(5));

对于flink批处理,如何做数据集批处理的KeyBy,数据集中是否有KeyBy的等价方法

DataSet<String> source = env.readTextFile("file:///path /to/ file");


DataSet<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });

共有1个答案

楚举
2023-03-14

groupBy似乎是您正在寻找的方法

 类似资料:
  • 问题内容: 我有一个数组: 我想获得此数组的第一个元素。预期结果: 字符串 一个要求: 它不能通过引用传递来完成 ,所以不是一个好的解决方案。 我怎样才能做到这一点? 问题答案: 原始答案,但代价昂贵(O(n)): 在O(1)中: 其他用例等 如果修改(就重置数组指针而言)不是问题,则可以使用: 如果需要数组“副本”,则从理论上讲应该更有效: 使用PHP 5.4+(但如果为空,则可能导致索引错误)

  • 本文向大家介绍JavaScript数组中的第一个元素和最后一个元素?,包括了JavaScript数组中的第一个元素和最后一个元素?的使用技巧和注意事项,需要的朋友参考一下 数组是一组元素。每个元素都有其自己的 索引值。我们可以使用这些索引访问任何元素。但是,对于最后一个元素,直到知道数组中存在的元素数量,我们才知道索引。在这种情况下,我们必须使用逻辑。让我们简要地讨论这些细节。 访问第一个元素 因

  • 问题内容: 我有一个像下面这样的列表,其中第一个元素是id,另一个是字符串: 我只想从此元组列表创建ID列表,如下所示: 我将使用此列表,因此它必须是整数值的列表。 问题答案:

  • 如何移除第一个数组但返回减去第一个元素的数组 在我的示例中,当删除第一个元素时,我应该得到

  • 在HashMap中,我可以使用containsKey(i)或containsValue(i)来检查我是否被使用;对数组也可以这样做吗?我的意思是检查myarray i1中的值是否为z i1==myarray中每个数组的第一个元素的组 在我的例子中{1,3,0,2}

  • 返回数组的第一个元素。 使用 arr[0] 返回传递数组的第一个元素。 const head = arr => arr[0]; head([1, 2, 3]); // 1