我正在使用Apache Flink尝试从Kafka获取JSON记录到InfluxDB,并在过程中将它们从一个JSON记录拆分为多个InfluxDB点。
我找到了平面图
转换,感觉它符合目的。核心代码如下所示:
DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
@Override
public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
Iterator<Entry<String, JsonNode>> iterator = //...
while (iterator.hasNext()) {
// extract point from input
InfluxDBPoint point = //...
out.collect(point);
}
}
});
出于某种原因,我只将其中一个收集的点流式传输到数据库中。
即使我打印出所有映射的条目,它似乎也能正常工作:dataStream.print()
产生:
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3
我是否误解了平面图,或者流入连接器中是否存在一些错误?
问题实际上与以下事实有关:流入中的一个系列(由标记集和度量定义,如图所示)每次只能有一个点,因此,即使我的字段不同,最终点覆盖了具有相同时间值的所有之前的点。
我在看一些旧代码,我试图重写它,但我遇到了一个问题。 这是运行良好的旧代码: 但是我使用的是Spring Data 2,其中的页面映射方法采用了一个函数而不是一个转换器,所以我不知道如何重写这个。 我读了这个主题:如何映射页面 有人能用SpringData2方法(Function而不是Converter)翻译上面的代码片段吗?
我想通过flatMap向数据集生成的每个组应用一个函数。groupBy(分组依据)。尝试调用flatMap时,我发现编译器错误: 我的代码: 事实上,在flink-0.9-SNAPSHOT的留档中没有列出或类似的。是否有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?
我有两个表和一个表来映射我以前的两个表的关系,我如何使用Spring data JPA高效自动地将数据插入到映射的表中?下面是表结构。 用户(id(PK),名称,电子邮件,userRoleId)角色(id(PK),名称,userRoleId)用户角色(id(PK),userId(FK<-User),roleId(FK<-Role))
我在spark中有一个数据集,只有一列,这列是一个Map[String,Any]。我想逐行映射数据集,然后逐键映射映射映射列,计算每个键的值,并使用新数据生成与前一个相同类型的新数据集。 例如: 我想在每个值的末尾加上“”,结果将是一个数据类型的数据集,如下所示: 谢谢Nir
我喜欢理解如何从df映射数据,以便它可以用作Kmeans的输入。 数据库的“布局”是D8:二进制(nullable=true) --field9:二进制(nullable=true)
问题内容: 标题中几乎说了出来。我有一堂课,看起来像这样: 使用Oracle 11g,该列为a ,但序列产生一个a 。数据库显然对此感到满意,但是应用程序需要能够支持可能已插入到应用程序外部此列中的非数字ID。 考虑上面的代码,我得到一个。有什么办法可以做这种映射吗? 使用Hibernate 3.6。 问题答案: 实现一个自定义的IdentifierGenerator类;从博客文章: 像这样注释实