当前位置: 首页 > 知识库问答 >
问题:

如何将链接表保存到csv文件?

公西马鲁
2023-03-14

我用的是Flink 1.4.0

我正在尝试将Table API查询的结果保存到CSV文件,但我收到错误。以下是详细信息:

我的输入文件如下所示:

id,species,color,weight,name 
311,canine,golden,75,dog1 
312,canine,brown,22,dog2 
313,feline,gray,8,cat1

我对此运行查询以仅选择犬类,我想将其保存到csv文件中:

ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment(); 
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env); 

String inputPath = "location-of-source-file"; 
CsvTableSource petsTableSource = CsvTableSource.builder() 
  .path(inputPath) 
  .ignoreFirstLine() 
  .fieldDelimiter(",") 
  .field("id", Types.INT()) 
  .field("species", Types.STRING()) 
  .field("color", Types.STRING()) 
  .field("weight", Types.DOUBLE()) 
  .field("name", Types.STRING()) 
  .build(); 

// Register our table source 
tableEnv.registerTableSource("pets", petsTableSource); 
Table pets = tableEnv.scan("pets"); 

Table counts = pets 
  .groupBy("species") 
  .select("species, species.count as count") 
  .filter("species === 'canine'"); 

// Convert to Dataset and display results
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); 
result.print(); 

// Write Results to File 
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets.csv", ","); 
counts.writeToSink(sink); 

当我运行此命令时,我看到数据集的结果被输出:
犬科动物,2

然而,我在输出文件中没有得到任何结果,我在下面看到了这些错误。我能做些什么来解决这个问题?谢谢

2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields
2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields
2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields.
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields.

共有1个答案

贲高寒
2023-03-14

你可能错过了一次机会

env.execute(); 

之后

counts.writeToSink(sink);

与立即触发执行的<code>print()</code>相反,<code>writeToSink()</code>只是附加了一个sink操作符,并需要显式触发执行。

< code>TypeExtractor的< code>INFO消息“只是”告诉您,< code>Row不能用作POJO类型,但是在这里没有问题。

 类似资料:
  • 问题内容: 我有一个熊猫DataFrame,我想上传到新的CSV文件。问题是在将文件传输到s3之前,我不想在本地保存文件。是否有像to_csv这样的方法可以将数据帧直接写入s3?我正在使用boto3。 这是我到目前为止的内容: 问题答案: 您可以使用:

  • 我有一个熊猫的数据文件,我想上传到一个新的CSV文件。问题是我不想在将文件转移到S3之前将其保存在本地。是否有类似于to_csv的方法可以直接将数据文件写入s3?我使用的是boto3。 以下是我目前所拥有的:

  • 本文向大家介绍如何将Python词典保存为CSV文件?,包括了如何将Python词典保存为CSV文件?的使用技巧和注意事项,需要的朋友参考一下 CSV(逗号分隔值)是最常见的文件格式,许多平台和应用程序都广泛支持该格式。 使用Python标准库中的csv模块。最简单的方法是在open()函数的帮助下以“ w”模式打开一个csv文件,并以逗号分隔的形式写入键值对。 csv模块包含DictWriter

  • 问题内容: 我正在尝试重组在Excel文件中组织降水数据的方式。为此,我编写了以下代码: 这段代码运行良好,通过Jupyter,我可以看到结果是不错的 但是,尝试将此数据帧保存到csv文件时遇到问题。 结果文件包含垂直索引列,看来我无法调用特定的单元格。 (希望有人可以帮助我解决这个问题)非常感谢! 问题答案: 全部在文档中。 您有兴趣跳过索引列,因此: 如果您还想跳过标题,请添加: 我不知道您的

  • (希望有人能帮我解决这个问题)非常感谢!!

  • 我正在处理一个包含uni_key和createdDate两列的数据帧。我运行一个SQL查询并将结果保存到中,现在我想将这些结果保存到csv文件中。有什么方法可以做到这一点吗?这是一个代码片段: 此代码当前出现以下错误: AttributeError:“DataFrameWriter”对象没有属性“csv”