当前位置: 首页 > 知识库问答 >
问题:

Google Cloud Dataproc最后阶段作业失败引发的火花

轩辕炎彬
2023-03-14

我在Dataproc上使用Spark集群,但我的作业在处理结束时失败了。

我的数据源是Google Cloud Storage上csv格式的文本日志文件(总量为3.5TB,5000个文件)。

处理逻辑如下:

  • 将文件读到DataFrame(模式[“timestamp”,“message”]);
  • 将所有邮件分组到1秒的窗口中;
  • 对每个分组消息应用管道[tokenizer->HashingTF]以提取单词及其频率来构建特征向量;
  • 在GCS上保存带有时间线的特征向量。

我遇到的问题是,在很小的数据子集(比如10个文件)上处理效果很好,但当我在所有文件上运行它时,它最终会失败,出现类似“容器因超出内存限制而被纱线杀死。使用了25.0 GB的24 GB物理内存。考虑增加spark.YARN.executor.memoryoverhead”这样的错误。

我的集群有25个使用n1-highmem-8机器的工人。因此,我搜索了这个错误,并将“spark.yarn.executor.memoryoverhead”参数增加到6500MB。

现在我的spark作业仍然失败,但是错误是“由于阶段失败而中止作业:4293个任务的序列化结果的总大小(1920.0MB)大于spark.driver.maxResultSize(1920.0MB)”

我是spark的新手,我认为我做了一些错误的事情,或者是在配置级别,或者是在我的代码中。如果你能帮我把这些东西清理干净,那就太好了!

下面是spark任务的代码:

import logging
import string
from datetime import datetime

import pyspark
import re
from pyspark.sql import SparkSession

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'

sc = pyspark.SparkContext()
spark = SparkSession\
        .builder\
        .appName("LogsVectorizer")\
        .getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)

logger.info("Start log processing at {}...".format(NOW))

# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'  
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)  
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)


# CSV data schema to build DataFrame
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("message", StringType())])

# Helpers to clean strings in log fields
def cleaning_string(s):
    try:
        # Remove ids (like: app[2352] -> app)
        s = re.sub('\[.*\]', 'IDTAG', s)
        if s == '':
            s = 'EMPTY'
    except Exception as e:
        print("Skip string with exception {}".format(e))
    return s

def normalize_string(s):
    try:
        # Remove punctuation
        s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
        # Remove digits
        s = re.sub('\d*', '', s)
        # Remove extra spaces
        s = ' '.join(s.split())
    except Exception as e:
        print("Skip string with exception {}".format(e)) 
    return s

def line_splitter(line):
    line = line.split(',')
    timestamp = line[0]
    full_message = ' '.join(line[1:])
    full_message = normalize_string(cleaning_string(full_message))
    return [timestamp, full_message]

# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)

# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))

# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())

# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
                       .agg(join_(F.collect_list("message")).alias("messages"))

# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)

pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])

logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)

logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
                   .withColumn("tokens_counts", asDense(logs_csv.tokens_counts))

# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)

# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)

共有1个答案

宋景福
2023-03-14

spark.driver.maxResultSize是驱动程序大小的问题,在Dataproc中,驱动程序运行在主节点上。

默认情况下,主程序内存的1/4给Driver,其中1/2给设置为Spark.Driver.maxresultsize(最大的RDD Spark允许您.collect()

我猜tokenizerhashingtf在驱动程序中移动“元数据”,其大小与您的密钥空间大小相同。要增加允许的大小,可以增加spark.driver.maxResultSize,但也可以增加spark.driver.memory和/或使用更大的master。Spark的配置指南有更多的信息。

 类似资料:
  • 我有一个spark作业,它连接2个数据集,执行一些转换,并减少数据以给出输出。现在的输入大小相当小(每个200MB数据集),但是在join之后,正如您在DAG中所看到的,作业会被卡住,并且不会继续进行第4阶段。我试着等了几个小时,它给了OOM并显示了第四阶段的失败任务。 为什么spark在stage-3(连接阶段)之后不显示stage-4(数据转换阶段)为活动的?它是不是在第3阶段和第4阶段之间徘

  • 我正在处理一些奇怪的错误信息,我认为这可以归结为内存问题,但我很难确定它,可以从专家那里得到一些指导。 我有一个两台机器的Spark(1.0.1)集群。两台机器都有8个核心;一台有16GB内存,另一台有32GB内存(这是主)。我的应用程序涉及计算图像中的成对像素亲和力,尽管我测试的图像到目前为止只有1920x1200大,16x16小。 我确实必须改变一些内存和并行性设置,否则我会得到显式的OutO

  • 我目前正在尝试为一个项目设置Elasticsearch。我已经安装了,还安装了Java,即。 但是当我尝试使用以下命令启动Elasticsearch时 我得到以下错误 loaded:loaded(/usr/lib/systemd/system/elasticsearch.service;disabled;vend 活动:自世界协调时2019-11-01 06:09:54开始失败(结果:退出-代码)

  • 我的理解是,这是SparkR的一个非常基本的功能,所以我真的不知道为什么它不能工作。由于某种原因,当我使用直接从数据源读取时,我没有问题。还有,我注意到错误行中的数字“:stage xxx中的task 0..”每次失败时递增1。 我还注意到,错误似乎来自于执行程序找不到的二进制文件,尽管我不确定为什么这只会发生在从本地data.frames创建的SparkDataFrames中,而不是从外部数据源

  • 我得到了“ExecutorLostFailure(Executor1 lost)”。 我已经尝试了大部分的Spark调优配置。我已经减少到一个执行人失去,因为最初我得到了像6个执行人失败。 以下是我的配置(我的spark-submit):

  • 按照本帖中的说明操作(https://spark.apache.org/docs/latest/sparkr.html#from-本地数据帧)我正在使用以下代码创建sparkdataframe: 但是函数总是导致以下错误。我得到同样的错误,当我试图运行以及。我也尝试过而不是createDataFrame。我还尝试在我的ipython笔记本中重新启动内核,并重新启动我的火花会话。 我的理解是,这是S