我正在使用Flink处理来自某些数据源(如Kafka、Pravega等)的数据。
在我的例子中,数据源是Pravega,它为我提供了一个flink连接器。
我的数据源正在向我发送一些JSON数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
以下是我的代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用FlinkPravegaReader和适当的反序列化程序来获取来自Pravega的JSON流。
然后我尝试将JSON数据转换为String,KeyBy
它们并对它们进行计数。
但是,我得到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎KeyBy抛出了此异常。
嗯,我不是Flink专家,所以我不知道为什么。我已经阅读了官方示例的源代码。在该示例中,有一个custom拆分器,用于将字符串数据拆分为单词。
所以我在想,在这种情况下,我是否也需要使用某种拆分器?如果是,我应该使用什么样的拆分器?你能给我举个例子吗?如果没有,为什么我会出现这样的错误以及如何解决它?
我想您已经阅读了有关如何指定键的文档
指定关键点
示例代码使用按键(“word”),因为word是POJO类型的字段。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
在你的例子中,你在keyBy
之前放了一个map
运算符,这个map
运算符的输出是一个string
。所以在你的例子中显然没有word
字段。如果你真的想对这个string
流进行分组,你需要像这样编写. keyBy(String::toString)
或者,您甚至可以实现自定义的键选择器来生成自己的键。
自定义键选择器
我从基于apache-camel-spark的rest接口获得一个json数组作为输入。开始时,我想通过apache camels路线分割json-array来处理每个元素。我该怎么做? 我的测试输入json: 对于这个问题,我在stackoverflow上找到了一些间接描述的问题: link 1, link 2, link 3。 根据这些示例,我尝试了以下骆驼路线: 当我这样做时,我总是得到以下
如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的
如何拆分阵列?例如,我有一个如下字符数组: 现在,我想用位置6上可以看到的空格拆分数组。拆分后数组将如下所示: 我确实在这里找到了类似的帖子,但在java或Kotlin中没有。 我知道我可以这样做: 但是,如果可能的话,我想要另一种方式。这占用了大量内存,在大型数组上也需要大约30-40毫秒 如何使用java或静态编程语言做到这一点
问题内容: 运行此查询时: 我得到一个这样的表: 我现在想做的是获取相同的信息,但是将数组分成几行,所以我得到的结果是这样的: 如您所见,我不想在“ selected_placements”中获取具有空值的行。 我正在使用PostgreSQL 8.0.2。 非常感谢! 问题答案: 我建议您升级Postgres版本。所有受支持的版本均支持: 在早期版本中,您可以尝试一次将它们选出来。尽管已在9.5中
使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每
问题内容: Mac是否有任何应用程序可以拆分sql文件甚至脚本?我有一个大文件,我必须将其上传到不支持8 MB以上文件的主机。 *我没有SSH访问权限 问题答案: 您可以使用此:http : //www.ozerov.de/bigdump/ 或者 使用此命令拆分sql文件 split命令获取一个文件并将其分成多个文件。-l 5000部分告诉它每五千行分割一次文件。下一部分是文件的路径,下一部分是要