我试图使用Spark,更具体地说是PySpark和结构化流来消费Kafka。
import os
import time
import time
from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
spark = SparkSession \
.builder \
.appName("Structured Streaming") \
.getOrCreate()
requests = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2:9092") \
.option("subscribe", "ssp.requests") \
.option("startingOffsets", "earliest") \
.load()
requests.printSchema()
# root |-- key: binary (nullable = true) |-- value: binary (nullable =
# true) |-- topic: string (nullable = true) |-- partition: integer
# (nullable = true) |-- offset: long (nullable = true) |-- timestamp:
# timestamp (nullable = true) |-- timestampType: integer (nullable =
# true)
rawQuery = requests \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.trigger(processingTime="5 seconds") \
.format("parquet") \
.option("checkpointLocation", "/home/user/folder/applicationHistory") \
.option("path", "/home/user/folder") \
.start()
rawQuery.awaitTermination()
PY4JJavaError:调用O70时出错。AwaitTermination
我只是将行rawquery.awaittermination()替换为
print(rawQuery.status)
time.sleep(60)
print(rawQuery.status)
rawQuery.stop()
而且管用。
我是PySpark的新手。我一直在用测试样本编写代码。一旦我在更大的文件上运行代码(3gb压缩)。我的代码只做了一些过滤和连接。关于py4J,我一直在出错。 任何帮助都是有益的,我们将不胜感激。 回来 更新:我使用的是py4j 10.7,刚刚更新到10.8 更新(1):添加spark。驾驶员内存: 汇总返回错误: 更新(2):我通过更改spark默认值尝试了这一点。conf文件。仍在获取错误PyS
我在运行Python 3.6.5的Jupyter笔记本和运行3.7.2的Python shell中出现了这个错误。我的操作系统是Windows10。我在这两种环境中都安装了pip pyspark。两者都使用Spark Version2.4.0,而我的Java JDK是Oracle JDK Version8,JDK1.8.0_201。这是我在这两种情况下运行的代码: 这里:Spyder中的PySpa
我想按照spark网站上的说明为spark安装graphframes,但命令: <代码>pyspark--打包graphframes:graphframes:0.8.1-spark3.0-s\u 2.12 不适合我。 我尝试了多种安装方法,但决定继续下载graphframes。jar,将其添加到Spark的常规列表中。jar文件并将其手动添加到代码spark中。sparkContext。addPy
我是一个学生,我真的被Py4JJavaError这个问题卡住了两个星期,在互联网上没有太多;我真的需要帮助: 请帮帮我这是下周要做的项目
当我尝试为kafka启动一个readStream时,我得到了以下错误,我的kafka已经启动并运行,我测试了它多次以确保它正在处理。Kafka主题也被创建。
这是在jupyter笔记本上运行的pyspark代码。 Py4JJavaError:调用None时出错。组织。阿帕奇。火花应用程序编程接口。JAVAJavaSparkContext.:JAVAlang.IllegalAccessError:课堂组织。阿帕奇。火花存储StorageUtils$(在未命名模块@0x30cb5b99中)无法访问sun类。尼奥。DirectBuffer(在模块java.b