当前位置: 首页 > 知识库问答 >
问题:

只有单线程使用多处理池与PySpark执行并行SQL查询

谈琛
2023-03-14

我有一个例子,我使用PySpark(如果不能使用Python而需要使用Scala或Java的话,则使用Spark)从几百个缺少主键的数据库表中提取数据。(为什么Oracle会创建包含带有主键的表的ERP产品是另一个主题...但不管怎样,我们需要能够提取数据并将数据从每个数据库表保存到拼花文件中。)我最初尝试使用Sqoop而不是PySpark,但由于遇到了一些问题,尝试使用PySpark/Spark更有意义。

理想情况下,我希望在我的compute集群中拥有每个任务节点:取一个表的名称,从数据库中查询该表,并在S3中将该表保存为一个拼花文件(或一组拼花文件)。我的第一步是让它在本地独立模式下工作。(如果每个给定表都有一个主键,那么我可以将查询和文件保存过程分区到给定表的不同行集上,并将行分区分布到compute集群中的任务节点上,以并行地执行文件保存操作,但因为Oracle的ERP产品缺少相关表的主键,所以这不是一个选项。)

我能够用PySpark成功地查询目标数据库,并且能够用多线程成功地将数据保存到parquet文件中,但是由于某种原因,只有单线程做任何事情。因此,所发生的情况是,只有一个线程获取一个表名,查询数据库,并将文件作为拼花文件保存到所需的目录中。则作业结束,就像没有执行其他线程一样。我猜可能发生了某种类型的锁定问题。如果我正确理解了这里的注释:如何在一个Sparkcontext中从Pyspark中的单独线程运行多个作业?那么我正在尝试做的应该是可能的,除非存在与执行并行JDBC SQL查询相关的特定问题。

Edit:我特别在寻找一种方法,允许我使用某种类型的线程池,这样我就不需要为我需要处理的每个表手动创建一个线程,并在集群中的任务节点上手动对它们进行负载平衡。

即使我尝试设置:

--master local[*]

而且

--conf 'spark.scheduler.mode=FAIR'
driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()

然后,为了测试多重处理,我在运行Jupyter笔记本的目录中创建了文件sparkmethods.py,并将此方法放入其中:

def testMe(x):
    return x*x

当我跑的时候:

from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(sparkMethods.testMe, range(10)))

在我的Jupyter笔记本中,我得到了预期的输出:

# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
    import os
    import os.path
    from pyshtml" target="_blank">park.sql import SparkSession, SQLContext
    spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "OURCONNECTIONURL;") \
        .option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
        .option("dbtable", tableName) \
        .option("user", "USERNAME") \
        .option("password", "PASSWORD") \
        .load()

    filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
    jdbcDF.write.parquet(filePath)
    fileExists = os.path.exists(filePath)
    if(fileExists):
        return (filePath + " exists!")
    else:
        return (filePath + " could not be written!")
import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(5) as p:
        p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)

问题是似乎只有一个线程在执行。

当我执行它时,在控制台输出中,我看到它最初包含以下内容:

进程无法访问该文件,因为另一个进程正在使用该文件。系统找不到文件C:\users\devin~1.bos\appdata\local\temp\spark-class-launcher-output-3662.txt。...。

import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",  
                          "OURCONNECTIONURL;", 
                          ["USERNAME", "PASSWORD"], 
                          r"C:\src\NetSuiteJDBC\NQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables
['SALES_TERRITORY_PLAN_PARTNER',
 'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
 'ITEM_ACCOUNT_MAP',
 'PRODUCT_TRIAL_STATUS',
 'ACCOUNT_PERIOD_ACTIVITY']

所以,可以想象,如果问题是PySpark不能像这样跨任务节点分发多个查询,那么也许我可以使用jaydebeapi库来进行连接。然而,在这种情况下,我仍然需要一种方法来将JDBC SQL查询的输出写到Parquet文件中(理想情况下,它将利用Spark的模式推断能力),但如果可行,我愿意采用这种方法。

那么,如何成功地查询数据库并将输出并行地保存到Parquet文件(即分布在任务节点上),而不让主节点顺序地执行所有查询呢?

共有1个答案

陶博涉
2023-03-14

在回答我的问题的评论中提供了一些提示,以及这里的答案:如何使用Pyspark并行运行独立的转换?我研究了线程而不是多处理的使用。我更仔细地研究了这里的一个答案:如何在一个Sparkcontext中从Pyspark中的单独线程运行多个作业?并注意到:

from multiprocessing.pool import ThreadPool

我能让它工作,就像这样:

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close() 
pool.join() 
print(*results, sep='\n')

它打印:

C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!
 类似资料:
  • 我有一个图像路径列表,我想在进程或线程之间划分,以便每个进程处理列表的某些部分。处理包括从磁盘加载图像,进行一些计算并返回结果。我正在使用Python 2.7 下面是我如何创建辅助进程 我所面临的问题是,当我在initializer函数中记录初始化时间时,我知道worker不是并行初始化的,而是每个worker都以5秒的间隔初始化,下面是供参考的日志 我尝试过使用将同时启动辅助线程 我知道Wind

  • 启动并行处理最简单的方式就是在 Step 配置中加上一个TaskExecutor , 比如,作为 tasklet 的一个属性: <step id="loading"> <tasklet task-executor="taskExecutor">...</tasklet> </step> 上面的示例中, taskExecutor指向了另一个实现 TaskExecutor 接口的Bean. T

  • 问题内容: 我可以选择让用户从FileChooser提交多个文件,以通过一些代码进行处理。结果将是读取文件的IO,然后是对存储数据的实际大量计算。允许用户选择多个文件,并且由于文件处理不依赖于所选的任何其他文件,因此使我的工作变得更加轻松。 此外,用户需要具有按钮列表,每个要取消的任务一个按钮以及“全部取消”按钮。因此,我必须考虑选择性或集体杀死一项或所有任务的能力。 最后一个要求是,我不允许用户

  • 我正在使用线程池执行器更改遗留设计。详情如下:- 遗留:-对于遗留设计,在应用程序启动时创建600个线程。和放置在各种池中,然后在需要时提取这些池,并将任务分配给相应的线程。 新:-在新设计中,我将线程池替换为执行器服务 我观察到的是,对于Executor,在启动时不会创建线程。它们是在从客户端激发请求时创建的。因此,与前一个线程相比,在内存中创建的线程要少得多。 但我的问题是,这样做是否正确,因

  • 本文向大家介绍C#线程执行超时处理与并发线程数控制实例,包括了C#线程执行超时处理与并发线程数控制实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#线程执行超时处理与并发线程数控制的方法。分享给大家供大家参考。具体实现方法如下: 特别说明: 1、为了测试方便,这里对存储过程的执行是模拟的 2、这里限制了并发执行存储过程的最大个数,但并没有对并发线程数进行控制,与文章标题略有不符,但程

  • 我们正在对使用SpringBoot 2.2.2和Spring执行器的应用程序进行性能测试。 我们希望监控: 正在使用多少tomcat线程 有多少tomcat请求正在排队 正在使用多少个ThreadPoolTaskExector线程(我们将@Async与线程池一起用于某些任务) 执行器中是否提供此信息?我看不到需要使用哪些指标。