我用的是Flink 1.4.0
我正在尝试将Table API查询的结果保存到CSV文件,但我收到错误。以下是详细信息:
我的输入文件如下所示:
id,species,color,weight,name 311,canine,golden,75,dog1 312,canine,brown,22,dog2 313,feline,gray,8,cat1
我对此运行查询以仅选择犬类,我想将其保存到csv文件中:
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
String inputPath = "location-of-source-file";
CsvTableSource petsTableSource = CsvTableSource.builder()
.path(inputPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("species", Types.STRING())
.field("color", Types.STRING())
.field("weight", Types.DOUBLE())
.field("name", Types.STRING())
.build();
// Register our table source
tableEnv.registerTableSource("pets", petsTableSource);
Table pets = tableEnv.scan("pets");
Table counts = pets
.groupBy("species")
.select("species, species.count as count")
.filter("species === 'canine'");
// Convert to Dataset and display results
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();
// Write Results to File
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets.csv", ",");
counts.writeToSink(sink);
当我运行此命令时,我看到数据集的结果被输出:
犬科动物,2
然而,我在输出文件中没有得到任何结果,我在下面看到了这些错误。我能做些什么来解决这个问题?谢谢
2018-05-27 13:29:17,040 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields 2018-05-27 13:29:17,040 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields 2018-05-27 13:29:17,040 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields. 2018-05-27 13:29:17,047 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields 2018-05-27 13:29:17,047 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields 2018-05-27 13:29:17,047 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields.
你可能错过了一次机会
env.execute();
之后
counts.writeToSink(sink);
与立即触发执行的<code>print()</code>相反,<code>writeToSink()</code>只是附加了一个sink操作符,并需要显式触发执行。
< code>TypeExtractor的< code>INFO消息“只是”告诉您,< code>Row不能用作POJO类型,但是在这里没有问题。
问题内容: 我有一个熊猫DataFrame,我想上传到新的CSV文件。问题是在将文件传输到s3之前,我不想在本地保存文件。是否有像to_csv这样的方法可以将数据帧直接写入s3?我正在使用boto3。 这是我到目前为止的内容: 问题答案: 您可以使用:
我有一个熊猫的数据文件,我想上传到一个新的CSV文件。问题是我不想在将文件转移到S3之前将其保存在本地。是否有类似于to_csv的方法可以直接将数据文件写入s3?我使用的是boto3。 以下是我目前所拥有的:
本文向大家介绍如何将Python词典保存为CSV文件?,包括了如何将Python词典保存为CSV文件?的使用技巧和注意事项,需要的朋友参考一下 CSV(逗号分隔值)是最常见的文件格式,许多平台和应用程序都广泛支持该格式。 使用Python标准库中的csv模块。最简单的方法是在open()函数的帮助下以“ w”模式打开一个csv文件,并以逗号分隔的形式写入键值对。 csv模块包含DictWriter
问题内容: 我正在尝试重组在Excel文件中组织降水数据的方式。为此,我编写了以下代码: 这段代码运行良好,通过Jupyter,我可以看到结果是不错的 但是,尝试将此数据帧保存到csv文件时遇到问题。 结果文件包含垂直索引列,看来我无法调用特定的单元格。 (希望有人可以帮助我解决这个问题)非常感谢! 问题答案: 全部在文档中。 您有兴趣跳过索引列,因此: 如果您还想跳过标题,请添加: 我不知道您的
(希望有人能帮我解决这个问题)非常感谢!!
我正在处理一个包含uni_key和createdDate两列的数据帧。我运行一个SQL查询并将结果保存到中,现在我想将这些结果保存到csv文件中。有什么方法可以做到这一点吗?这是一个代码片段: 此代码当前出现以下错误: AttributeError:“DataFrameWriter”对象没有属性“csv”