当前位置: 首页 > 知识库问答 >
问题:

使用PySpark和Kafka,Py4jJavaError的结构化流:调用O70时出错。AwaitTermination

太叔鸿
2023-03-14

我试图使用Spark,更具体地说是PySpark和结构化流来消费Kafka。

import os
import time
import time

from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Structured Streaming") \
    .getOrCreate()

    requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2:9092") \
  .option("subscribe", "ssp.requests") \
  .option("startingOffsets", "earliest") \
  .load()

requests.printSchema()

# root  |-- key: binary (nullable = true)  |-- value: binary (nullable =
# true)  |-- topic: string (nullable = true)  |-- partition: integer
# (nullable = true)  |-- offset: long (nullable = true)  |-- timestamp:
# timestamp (nullable = true)  |-- timestampType: integer (nullable =
# true)
rawQuery = requests \
        .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream.trigger(processingTime="5 seconds") \
        .format("parquet") \
        .option("checkpointLocation", "/home/user/folder/applicationHistory") \
        .option("path", "/home/user/folder") \
        .start()
rawQuery.awaitTermination()    

PY4JJavaError:调用O70时出错。AwaitTermination

共有1个答案

海灵均
2023-03-14

我只是将行rawquery.awaittermination()替换为

print(rawQuery.status)
time.sleep(60)
print(rawQuery.status)
rawQuery.stop()

而且管用。

 类似资料:
  • 我是PySpark的新手。我一直在用测试样本编写代码。一旦我在更大的文件上运行代码(3gb压缩)。我的代码只做了一些过滤和连接。关于py4J,我一直在出错。 任何帮助都是有益的,我们将不胜感激。 回来 更新:我使用的是py4j 10.7,刚刚更新到10.8 更新(1):添加spark。驾驶员内存: 汇总返回错误: 更新(2):我通过更改spark默认值尝试了这一点。conf文件。仍在获取错误PyS

  • 我在运行Python 3.6.5的Jupyter笔记本和运行3.7.2的Python shell中出现了这个错误。我的操作系统是Windows10。我在这两种环境中都安装了pip pyspark。两者都使用Spark Version2.4.0,而我的Java JDK是Oracle JDK Version8,JDK1.8.0_201。这是我在这两种情况下运行的代码: 这里:Spyder中的PySpa

  • 我想按照spark网站上的说明为spark安装graphframes,但命令: <代码>pyspark--打包graphframes:graphframes:0.8.1-spark3.0-s\u 2.12 不适合我。 我尝试了多种安装方法,但决定继续下载graphframes。jar,将其添加到Spark的常规列表中。jar文件并将其手动添加到代码spark中。sparkContext。addPy

  • 我是一个学生,我真的被Py4JJavaError这个问题卡住了两个星期,在互联网上没有太多;我真的需要帮助: 请帮帮我这是下周要做的项目

  • 当我尝试为kafka启动一个readStream时,我得到了以下错误,我的kafka已经启动并运行,我测试了它多次以确保它正在处理。Kafka主题也被创建。

  • 这是在jupyter笔记本上运行的pyspark代码。 Py4JJavaError:调用None时出错。组织。阿帕奇。火花应用程序编程接口。JAVAJavaSparkContext.:JAVAlang.IllegalAccessError:课堂组织。阿帕奇。火花存储StorageUtils$(在未命名模块@0x30cb5b99中)无法访问sun类。尼奥。DirectBuffer(在模块java.b