当前位置: 首页 > 知识库问答 >
问题:

pyspark-Kafka流-内存不足

濮书
2023-03-14

我正在尝试使用此代码使用代理版本0.10测试kafka流。这只是一个打印主题内容的简单代码。还没什么大不了的!但是,由于某种原因内存不足(VM中的10GB RAM)!代码:

# coding: utf-8

"""
kafka-test-003.py: test with broker 0.10(new Spark Stream API)

How to run this script?

spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar,jars/kafka-clients-0.11.0.0.jar kafka-test-003.py



"""


import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession,Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


# starting spark session
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# getting streaming context
sc = spark.sparkContext
ssc = StreamingContext(sc, 2) # batching duration: each 2 seconds

broker = "kafka.some.address:9092"
topic = "my.topic"

### Streaming

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", broker) \
  .option("startingOffsets", "earliest") \
  .option("subscribe", topic) \
  .load() \
  .select(col('key').cast("string"),col('value').cast("string"))

query = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

### End Streaming

query.awaitTermination()

运行火花提交:

spark-submit --master local[*] --driver-memory 5G --executor-memory 5G --jars jars/kafka-clients-0.11.0.0.jar,jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-003.py

不幸的是,结果是:

java.lang.OutOfMemoryError:Java堆空间

我假设Kafka每次应该带一小部分数据来避免这个问题,对吗?那么,我做错了什么?

共有1个答案

郎子平
2023-03-14

spark内存管理是一个复杂的过程。最佳解决方案不仅取决于您的数据和操作类型以及系统行为,您可以重试以下spark命令:

spark submit--master local[*]--驱动程序内存4G--执行器内存2G--执行器内核5--num executors 8--jars jars/kafka-clients-0.11.0.0。jar,jars/spark-sql-kafka-0-10\u 2.11-2.3.0。jar kafka-test-003。py公司

您能否通过调整性能,按照以下链接调整上述内存参数?使用spark submit,--total executor cores选项的行为是什么?

 类似资料:
  • 我在Kafka·吉拉也描述了这个问题:https://issues.apache.org/jira/browse/KAFKA-13014 我们有多个实例和线程的Kafka流。 这个Kafka流消耗了很多话题。 其中一个主题分区一天内无法访问,主题保留时间为4小时。 解决问题后,Kafka流正试图从不再存在的偏移量中消费: Kafka消费群体描述: 我们可以看到KS正在等待的当前偏移量是 Kafka

  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 当我运行用pyspark编写的spark作业时,我会运行一个jvm,它的Xmx1g设置似乎无法设置。以下是ps aux的输出: 我的问题是,如何设置此属性?我可以使用SPARK\u DAEMON\u memory和SPARK\u DRIVER\u memory设置主内存,但这不会影响pyspark的派生进程。 我已经尝试了JAVA\u选项,或者实际查看了包的文件,但无法理解这是在哪里设置的。 设置

  • 我在使用SharpZipLib的GZipInputStream编写未压缩的GZIP流时遇到问题。我似乎只能获得256字节的数据,其余的数据没有写入并保留为零。已检查压缩流(compressedSection),所有数据都在那里(1500字节)。解压缩过程的片段如下: 因此,在这段代码中: 1) 压缩的部分被传入,准备解压缩。 2) 未压缩输出的预期大小(以2字节小endian值的形式存储在文件头中

  • 我正在尝试使用pyspark学习以下hello word级别的示例,例如下面的示例。我得到了一个“MethodisBarrier([])不存在”错误,代码下面包含了完整的错误。 尽管如此,当我在命令行中直接启动pyspark会话并键入相同的代码时,它工作得很好: 我的设置: windows 10 Pro x64

  • 我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。 这里是平均值的函数: 以下是例外情况: 问题是基本目录不存在,但我希望kafka流在必要时创建目录。 编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。 1个处理器的配置文件: 2个处理器的配置文件: 现在我启动处理器: 类型元组包含配置文