当前位置: 首页 > 知识库问答 >
问题:

收集Spark群集外的数据时内存不足错误

经正祥
2023-03-14
    null

当我运行上面的代码,然后缓存该表以激发内存时,它占用的内存<2GB-与集群可用的内存相比很小-然后当我试图收集数据到驱动程序节点时,我会得到一个OOM错误。

我已尝试在以下设置上运行:

  • 具有32个内核和244GB RAM的计算机上的本地模式
  • 具有10 x 6.2GB执行程序和61GB驱动程序节点的独立模式

我的问题:

  1. 缓存后占用空间如此之少的数据文件怎么会导致内存问题?
  2. 在我转向可能损害性能的其他选项之前,是否有一些明显的东西需要我检查/更改/疑难解答以帮助修复该问题?

谢谢。

    null
#__________________________________________________________________________________________________________________________________

# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________

firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'

library(dplyr)
library(stringr)
library(sparklyr)

#__________________________________________________________________________________________________________________________________

# Configure & connect to spark
#__________________________________________________________________________________________________________________________________

Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 

config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions

# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')

#__________________________________________________________________________________________________________________________________

# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________

#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++

spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory

#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++

# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1

# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 

# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)

#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++

# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory

  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%

  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%

  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%

  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))

# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")

#__________________________________________________________________________________________________________________________________

# Collect out of spark
#__________________________________________________________________________________________________________________________________

myDF <- collect(dftbl)

共有1个答案

白念
2023-03-14

当您说在dataframe上收集时,会发生两件事,

  1. 首先,必须将所有数据写入驱动程序的输出。
  2. 驱动程序必须从所有节点收集数据并保存在内存中。

答:

 类似资料:
  • 如果我只有一个内存为25 GB的执行器,并且如果它一次只能运行一个任务,那么是否可以处理(转换和操作)1 TB的数据?如果可以,那么将如何读取它以及中间数据将存储在哪里? 同样对于相同的场景,如果hadoop文件有300个输入拆分,那么RDD中会有300个分区,那么在这种情况下这些分区会在哪里?它会只保留在hadoop磁盘上并且我的单个任务会运行300次吗?

  • 下面是我的代码: 每次尝试运行此代码时,我都会得到Java.lang.OutofMemoryError:Java堆空间。据我所知,Spark正在我的主节点上执行collect()操作。那么有没有什么方法可以增加内存,使它能够运行程序呢?

  • 当我将大数据保存到hdfs时,我正在体验OOME 我在Spark-Submit中使用这个: 当我增加框架时,现在的错误是:Java.lang.outofMemoryError:Java堆空间,所以我必须将驱动程序内存和执行程序内存增加到2G才能工作。如果累加Collection.value.length是500,000,我需要使用3G。这正常吗? 该文件只有146MB,包含200,000行(对于2

  • 我目前正在一个小型集群(3个节点,32个CPU和128 GB Ram)上使用线性回归(Spark ML)中的基准测试来评估Spark 2.1.0。我只测量了参数计算的时间(不包括启动、数据加载、……)并识别了以下行为。对于小的数据集,0.1MIO-3MIO数据点,测量的时间并没有真正增加,停留在大约40秒。只有对于较大的数据集,如300个Mio数据点,处理时间才会达到200秒。因此,集群似乎根本无

  • 我们有一个Hadoop集群,数据节点为275个节点(55Tb总内存,12000个VCore)。这个集群与几个项目共享,我们有一个YARN队列分配给我们,资源有限。 为了提高性能,我们正在考虑为我们的项目构建一个单独的Spark集群(在同一网络中的Mesos上)并访问Hadoop集群上的HDFS数据。 正如Spark文档中提到的:https://spark.apache.org/docs/lates

  • 我创建了一个AWS密钥对。 我在这里逐字逐句地遵循指示:https://aws.amazon.com/articles/4926593393724923 当我键入“aws emr创建集群——名称SparkCluster——ami版本3.2——实例类型m3.xlarge——实例计数3——ec2属性KeyName=MYKEY——应用程序名称=Hive——引导操作路径=s3://support.elas