我正在AWS EMR上运行一个5节点的Spark集群,每个集群的大小为m3.xLarge(1主、4从)。我成功地运行了一个146MB的bzip2压缩CSV文件,最终得到了一个完美的聚合结果。
现在我正试图在这个集群上处理一个~5GB的bzip2 CSV文件,但收到以下错误:
[
{
"classification":"spark-env",
"properties":{
},
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"python34"
},
"configurations":[
]
}
]
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
},
"configurations":[
]
}
]
def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp"))
diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
.over(window))
time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
.withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp")
.rowsBetween(-1, 0))
sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
return sessions
def aggregate_sessions(sessions):
median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
pyspark.sql.functions.first("site_id").alias("site_id"),
pyspark.sql.functions.first("user_id").alias("user_id"),
pyspark.sql.functions.count("id").alias("hits"),
pyspark.sql.functions.min("timestamp").alias("start"),
pyspark.sql.functions.max("timestamp").alias("finish"),
median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
)
return aggregated
spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
header=True,
inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)
它从这些错误中的一些开始,并停止相同错误的数量增加。
我试着用--conf spark.yarn.executor.memoryeadher运行spark-submit,但这似乎也不能解决问题。
我感觉到你的痛苦..
我们也有类似的问题,即在纱线上使用Spark时内存不足。我们有五个64GB、16个核心VM,无论我们将spark.yarn.executor.memoryoverhead
设置为什么,我们都无法为这些任务获得足够的内存--无论我们给它们多少内存,它们最终都会死亡。这是一个相对简单的Spark应用程序,导致了这种情况的发生。
我们发现VM上的物理内存使用率很低,但虚拟内存使用率非常高(尽管日志抱怨物理内存)。我们将yarn.nodemanager.vmem-check-enabled
中的yarn-site.xml
设置为false
,容器不再被杀死,应用程序似乎按预期工作。
该页面有一个来自IBM的非常有用的页面链接:https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en
总之,glibc>2.10改变了它的内存分配。尽管大量的虚拟内存被分配并不是世界末日,但它不能与YARN的默认设置一起工作。
在hadoop-env.sh
中将yarn.nodemanager.vmem-check-enabled
设置为false,而不是将malloc_arena_max
环境变量设置为低值。此bug报告提供了有关此问题的有用信息:https://issues.apache.org/jira/browse/hadoop-7154
我建议通读两页--这些信息非常方便。
问题内容: 带有 Windows的Docker桌面的Docker Windows容器是否具有默认内存限制?我有一个在容器中运行时崩溃的应用程序,但是当我尝试为命令指定参数时,它似乎运行良好。至少在以前崩溃的情况下。这给我的印象是有默认的内存限制,但是我在文档中找不到它。所以我的问题是是否存在内存限制,并且是否在记录的地方? 问题答案: 根据关于Windows的Docker Github问题的讨论(
我有一个tomcat服务器,可以处理一些rest API请求。这个tomcat崩溃是由于某些输入中的一个特定rest请求内存不足导致的,这会导致大量堆大小的使用,从而导致所有站点崩溃。 我想限制这个Rest请求内存使用我怎么能做到呢?我通常想保护tomcat免受大内存使用请求崩溃的影响。我怎么能做到呢?也许以某种方式限制所有线程最大堆大小?
问题内容: 我正在Ubuntu 13.04(Raring Ringtail)之上运行最新版本的Docker: 但是当我启动容器时 我没有看到任何限制,并且我的内核启用了cgroups内存限制: 我在这里想念什么明显的东西? 问题答案: 将不会显示它,因为这是通过cgroups强制执行的。而是可以在主机(容器外部)上使用和cgroup内存进行检查: 要查看它的内存不足,可以运行一些将使用比分配的内存
我无法理解纱线配置。 我在纱线/MapReduce配置中有这样的行: 这里写着: 默认情况下(“yarn.nodemanager.vmem-pmem-ratio”)设置为2.1。这意味着map或reduce容器最多可以分配2.1倍(“MapReduce.reduce.memory.MB”)或(“MapReduce.map.memory.MB”)的虚拟内存,然后NM才会杀死该容器。 我能得到更好的解
我很好奇如何处理GAE中的内存限制。目前,我有这个应用程序,需要大量的CPU/内存。 我尝试在GAE上使用b8实例运行它(基本上是使用4.8GHz CPU的顶级实例) 我还尝试手动设置CPU的数量 但无论我做什么,我总是达到同样的记忆限制。。。(见下文) GET500 0 B 43 s Unknown/_ah/start在总共处理0个请求后,超过了2048 MB的软内存限制,达到3163 MB。考
问题内容: python的内存有限制吗?我一直在使用python脚本从最小150mb大的文件中计算平均值。 根据文件的大小,我有时会遇到一个。 可以为python分配更多的内存,这样我就不会遇到错误吗? 编辑:下面的代码 注意:文件大小可能相差很大(最大20GB),文件的最小大小为150mb 问题答案: (这是我的第三个答案,因为我误解了您的代码在原始代码中所做的事情,然后在第二个错误中犯了一个小