当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink中拆分NodeObject的数据

燕和裕
2023-03-14

我正在使用Flink处理来自某些数据源(如Kafka、Pravega等)的数据。

在我的例子中,数据源是Pravega,它为我提供了一个flink连接器。

我的数据源正在向我发送一些JSON数据,如下所示:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

以下是我的代码:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

如您所见,我使用FlinkPravegaReader和适当的反序列化程序来获取来自Pravega的JSON流。

然后我尝试将JSON数据转换为String,KeyBy它们并对它们进行计数。

但是,我得到一个错误:

 The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

似乎KeyBy抛出了此异常。

嗯,我不是Flink专家,所以我不知道为什么。我已经阅读了官方示例的源代码。在该示例中,有一个custom拆分器,用于将字符串数据拆分为单词。

所以我在想,在这种情况下,我是否也需要使用某种拆分器?如果是,我应该使用什么样的拆分器?你能给我举个例子吗?如果没有,为什么我会出现这样的错误以及如何解决它?

共有1个答案

井学
2023-03-14

我想您已经阅读了有关如何指定键的文档

指定关键点

示例代码使用按键(“word”),因为word是POJO类型的字段。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

在你的例子中,你在keyBy之前放了一个map运算符,这个map运算符的输出是一个string。所以在你的例子中显然没有word字段。如果你真的想对这个string流进行分组,你需要像这样编写. keyBy(String::toString)

或者,您甚至可以实现自定义的键选择器来生成自己的键。

自定义键选择器

 类似资料:
  • 我从基于apache-camel-spark的rest接口获得一个json数组作为输入。开始时,我想通过apache camels路线分割json-array来处理每个元素。我该怎么做? 我的测试输入json: 对于这个问题,我在stackoverflow上找到了一些间接描述的问题: link 1, link 2, link 3。 根据这些示例,我尝试了以下骆驼路线: 当我这样做时,我总是得到以下

  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的

  • 如何拆分阵列?例如,我有一个如下字符数组: 现在,我想用位置6上可以看到的空格拆分数组。拆分后数组将如下所示: 我确实在这里找到了类似的帖子,但在java或Kotlin中没有。 我知道我可以这样做: 但是,如果可能的话,我想要另一种方式。这占用了大量内存,在大型数组上也需要大约30-40毫秒 如何使用java或静态编程语言做到这一点

  • 问题内容: 运行此查询时: 我得到一个这样的表: 我现在想做的是获取相同的信息,但是将数组分成几行,所以我得到的结果是这样的: 如您所见,我不想在“ selected_placements”中获取具有空值的行。 我正在使用PostgreSQL 8.0.2。 非常感谢! 问题答案: 我建议您升级Postgres版本。所有受支持的版本均支持: 在早期版本中,您可以尝试一次将它们选出来。尽管已在9.5中

  • 问题内容: Mac是否有任何应用程序可以拆分sql文件甚至脚本?我有一个大文件,我必须将其上传到不支持8 MB以上文件的主机。 *我没有SSH访问权限 问题答案: 您可以使用此:http : //www.ozerov.de/bigdump/ 或者 使用此命令拆分sql文件 split命令获取一个文件并将其分成多个文件。-l 5000部分告诉它每五千行分割一次文件。下一部分是文件的路径,下一部分是要

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每