当前位置: 首页 > 知识库问答 >
问题:

从pyspark作业在s3 bucket中动态创建文件夹

莫河
2023-03-14

我正在将数据写入s3 bucket,并使用pyspark创建parquet文件。我的桶结构如下所示:

s3a://rootfolder/subfolder/table/

子文件夹和表这两个文件夹应该在运行时创建,如果文件夹不存在,如果文件夹存在,则应该在文件夹表中创建拼接文件。

当我在本地计算机上运行pyspark程序时,它会用_$folder$(liketable_$folder$)创建额外的文件夹,但是如果在emr上运行相同的程序,它会用_success创建。

writing into s3: (pyspark program)
 data.write.parquet("s3a://rootfolder/sub_folder/table/", mode="overwrite")

是否有办法只在s3中创建文件夹,如果不存在,并且不创建像table_$folder$或with_success这样的文件夹。

共有1个答案

申屠泳
2023-03-14

s3a连接器(org.apache.hadoop.fs.s3a.s3Afilesystem)不创建$folder$文件。它生成目录标记为path+/,。例如,mkdir s3a://bucket/a/b创建一个零字节标记对象/a/b/。这将它与具有路径/a/b的文件区别开来

  1. 如果在本地使用s3n:URL。住手。使用S3a连接器。
  2. 如果您一直在设置fs.s3a.impl选项:停止它。hadoop知道该使用什么,它使用S3AFileSystem类
  3. 如果您看到它们并且正在运行EMR,那么这就是EMR的连接器。源代码已关闭,超出范围。
 类似资料:
  • 我正在处理非常长的嵌套JSON文件中的数据。问题是,这些文件的结构并不总是相同的,因为有些文件缺少其他文件的列。我想从一个包含所有列的空JSON文件创建一个定制模式。如果我稍后将JSON文件读入这个预定义的模式,不存在的列将被空值填充(至少计划是这样的)。到目前为止我所做的: 将测试 JSON(不包含预期的所有列)加载到数据帧中 将其架构写入 JSON 文件 在文本编辑器中打开此 JSON 文件并

  • 场景: 根据少数规则奖励学生。在这里,每个学生都有资格获得多项奖励,如成绩为80分的学生可以同时获得award\u 65和award\u 75。因此,在评估结果后,我想要一份学生有资格获得的奖项列表 这个用例有点不切实际,但我尝试将其与我的问题进行类比(无法发布,因为它包含敏感信息) 示例规则如下: 如果一个学生有80分,那么根据当前的分数(即没有元数据相关行),我可以在结果列表中获得两个对象 D

  • 如何基于应用程序动态定义bean。yml文件? 例如,YAML文件如下所示: 这将动态创建两个带有内容类型标题集的HTTPHeader。 下面是我现在如何定义bean: 如果我需要添加更多endpoint,我需要复制并粘贴这些bean,这是我想要避免的。 注意:这些动态bean不需要任何其他bean。我不确定这是否有什么不同。它只需要加载配置。

  • 问题内容: 我需要并行执行作业,并且遇到了这个名为Jenkins的名为MultiJob插件的插件。 浏览完文档后,我创建了阶段并给出了工作名称。但是我基本上在哪里创建作业。我的意思是作业“ TaskToExecute1”和“ TasktoExecute2”的脚本,构建步骤和构建后步骤。 谢谢,VVP 问题答案: 您需要创建从“作业名称”引用的作业。 因此,在您的示例中,创建一个单独的作业/项目(例

  • 石英2.2 我在运行时动态创建和调度Quartz作业,并将Quartz配置为JDBC-Job-Store。这些作业需要在应用程序执行之间保持不变。在作业执行期间,我需要访问完整的Spring上下文(Spring管理的bean和JPA事务)。 然而,如果我试图将任何东西自动加入到我的工作中,那么我会得到一个错误,比如…“通过字段MyAutowiredField表示的不满足的依赖项” 我想不通。我已经

  • 我试图弄清楚如何通过使用与哥伦布()函数并在pySpark中的cp_codeset列()函数中调用udf来动态地为列表中的每个项目创建列(在这种情况下为列表)。下面是我写的代码,但它给了我一个错误。 另一种选择是手动执行它,但在这种情况下,我必须编写相同的udf函数并使用养分()函数调用它75次(这是cp_codeset["col_names"]的大小) 下面是我的两个数据帧,我正在尝试获得结果如