当前位置: 首页 > 知识库问答 >
问题:

将PySpark RDD上载到BigQuery

梅跃
2023-03-14

我从BQ下载了一个表格到PySpark RDD,如下所示。我如何再次上传它?

dGSConfig = {
    'project_id': "project_id",
    'bucket': "bucket_id"
}
dBQConfig = {
    'gs_config': dGSConfig,
    'project_id': "project_id",
    'dataset_id': "dataset_id",
    'table_id': "table_id"
}

oSc = instantiate_pyspark()
rddData, lsHeadings = get_table_cloud(oSc, dBQConfig)  #rddData has a list-of-lists type format




def instantiate_pyspark():
    """instantiate the pyspark RDD stuff"""
    import pyspark

    oSc = pyspark.SparkContext()
    oHadoopConf = oSc._jsc.hadoopConfiguration()
    oHadoopConf.get("fs.gs.system.bucket")

    return oSc


def get_table_cloud(oSc, dBQConfig):
    """get a table from bigquery via google cloud storage
    Config format:
        dGSConfig = {'project_id': '', 'bucket':  ''}
        dBQConfig = {'project_id: '', 'dataset_id': '', 'table_id': ''}
    """
    dGSConfig = dBQConfig['gs_config']

    dConf = {
        "mapred.bq.project.id": dGSConfig['project_id'],
        "mapred.bq.gcs.bucket": dGSConfig['bucket'],
        "mapred.bq.input.project.id": dBQConfig['project_id'],
        "mapred.bq.input.dataset.id":dBQConfig['dataset_id'],
        "mapred.bq.input.table.id": dBQConfig['table_id']
    }

    rddDatasetRaw = oSc.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=dConf
    )

    import json
    lsHeadings = json.loads(rddDatasetRaw.take(1)[0][1]).keys()

    rddDataset = (
        rddDatasetRaw
        .map(lambda t, json=json: json.loads(t[1]).values() )
    )

    return rddDataset, lsHeadings

共有2个答案

程天佑
2023-03-14

我曾经用过的三种方法:

1) 创建本地csv,上传到google存储,单独的进程进入BigQuery:

llData = rddData.collect()


with open(sCsvPath, 'w') as f:
    import csv
    oWriter = csv.writer(f)
    for lData in llData:
        oWriter.writerow(lData)

import subprocess
lsCommand = ['gsutil', 'cp', sCsvPath, sGooglePath]
subprocess.check_output(lsCommand)

2) 使用Pandas直接上传到BigQuery:

import pandas as pd
dfData = pd.DataFrame(llData, columns=lsHeadings)

sProjectID = dBQConfig['sProjectID']
sTargetDataset = dBQConfig['sTargetDataset']
sTargetTable = dBQConfig['sTargetTable']

sTablePath = "{}.{}".format(sTargetDataset, sTargetTable)
dfData.to_gbq(sTablePath, sProjectID, if_exists='replace')

3) 使用 pyspark 将分布式结果直接保存到存储中:

#remove previous dir if exists
import subprocess
lsCommand = ['gsutil', 'rm', '-r', sGooglePath]
subprocess.check_output(lsCommand)

rddSave.saveAsTextFile(sGooglePath)

虽然这些都不是我最初想要的,但这是一种将结果直接上传到 BQ 的 PySpark 方式。

澹台承载
2023-03-14

您可以导出到一些中间文件,然后将这些文件加载到BigQuery中。

这可能会有所帮助:如何将pyspark中的表格数据帧导出为csv?

 类似资料:
  • 我正在尝试将应用程序中的数组上载到Firestone。然而,它继续显示 使用无效数据调用了“firebaseError:function fieldvalue.arrayUnion()”。 这是我的代码: 请注意,数据库中还不存在数组“players”。

  • 我目前正在研究在测试套件执行后根据testng XML中的参数将testng报告上传到Amazon S3的能力。考虑以下testng XML: 给定前四个参数(upload-test-reports、aws-provider-type、aws-s3-bucket和target-reports-directory),Java项目将把默认测试输出目录(当前为/build/reports/tests)中

  • 我一直尝试将此csv上传到google Bigquery,但我总是出错。 读取数据时出错,错误消息:CSV表遇到太多错误,放弃。行数:303;错误:1。有关详细信息,请查看错误[]集合。读取数据时出错,错误消息:无效时区:PM;无法将“09/09/2006 11:45:00 PM”解析为从位置71061开始的字段日期(位置2)的日期时间 是这个csv文件。我得到了上面的错误。 https://ib

  • 我是spring boot上的wokring,我已经在这个路径上创建了一个文件夹:myApp/src/web/images 在做的时候 我得到了这个错误: Java语言nio。文件NoSuchFileException:/private/var/folders/4g/wd\u lgz8970sfh64zm38lwfhw0000gn/T/tomcat docbase。8255351399752894

  • 在application.properties中,我将java和maven版本指定为 无法执行目标org.apache.maven.plugins:maven-compiler-plugin:3.8.1:在项目上编译(default-compile)heroku-spring:编译时出现致命错误:无效的目标版本:11 我该怎么解决呢?

  • 我试图使用两个成功部署的云函数和一个成功运行的云调度器来自动化向BigQuery上传JSON数据。运行云调度器后,数据会上传到云存储中,但不会上传到BigQuery。 下面是我的代码和JSON数据: 这就是我的JSON数据的样子: 请指教。