当前位置: 首页 > 知识库问答 >
问题:

从postgreSQL读取100m行到Spark,并写入到parquet

壤驷俊逸
2023-03-14
val connectionProperties = new Properties()
connectionProperties.put("user", "$USER")
connectionProperties.put("password", "$PASSWORD")

// val connection = DriverManager.getConnection("$SERVER", "$USER", "$PASSWORD")
//connection.isClosed()

val jdbc_url = s"jdbc:postgresql://${"$HOST"}:${$PORT}/${"$DB"}"
val df = spark.read.option("inferSchema", true).jdbc(jdbc_url, "$TABLE", connectionProperties)

任何帮助都将非常感谢。

共有1个答案

施翰学
2023-03-14

您可以尝试手动指定架构,以字符串形式读取列,然后手动解析定义用户定义函数的值。

要手动指定模式,您需要编写如下代码

    val schema =
  StructType(
    StructField("your-example-column1", IntegerType, true) ::
    StructField("your-money-column", StringType, true) :: Nil)
    spark.read.schema(schema)

参见Spark Scala API:

    null
 类似资料:
  • 我试图从mysql读取数据,并将其写回s3中的parquet文件,具体分区如下: 我的问题是,它只打开一个到mysql的连接(而不是4个),并且在从mysql获取所有数据之前,它不会写入parquert,因为mysql中的表很大(100M行),进程在OutOfMemory上失败。 有没有办法将Spark配置为打开多个到mysql的连接并将部分数据写入镶木地板?

  • 问题内容: 在Python中,我有以下将不使用文件将行批量加载到Postgresql的方法: 我正在尝试在Go中完成相同的任务。我目前正在将行写入文件,然后将其导入,然后删除该文件。我想像在Python中一样从STDIN导入行。我有: 编辑:进一步,但这不是插入记录: 没有插入任何记录,并且出现无用的错误: 问题答案: 下面的代码应指导您要走的方向: 如果密钥需要动态,则可以从中获取密钥。 请注意

  • 我正在尝试为Arduino制作一个基于Tkinter的GUI,打印传感器值并响应用户输入。 我试图用来消除循环的代码是这个,它不打印任何传感器信息,唯一的输出是“尝试.../dev/ttyACM0”,然后是tkinter窗口打开。从Tkinter导入*导入串行导入时间 另一方面,除了没有tkinter窗口之外,这是完美的。但它会从缓冲区中删除旧输入并读取新输入。 这是受前一段时间不同的stacko

  • 问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配

  • 我正在尝试用套接字从客户端向服务器写入一个结构。 结构是: 这两个程序(服务器、客户机)的结构是相同的,并且这两个程序的结构是相同的。 客户端程序: 结构R*r; 马洛克.. ...(用数据填充结构) write(socket_fd,(void*)r,size of(R)); 服务器程序: 结构R*r; 马洛克.. read(client_fd,(R*)r,size of(R)); 这不是将结构从

  • 我是火花的新手,我找不到这个...我有许多拼花地板文件上传到的位置: 此文件夹的总大小为,。如何将这些文件分块并读取到一个数据包中,如何将所有这些文件加载到一个数据包中? 错误: