当前位置: 首页 > 知识库问答 >
问题:

无法将Pyspark数据帧发送到Kafka主题

双元魁
2023-03-14

我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误:

Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr/local/rms/lib/hdp26_c5000/park2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/utils.py", line 71, in deco引发Analysis Exception(s.split (': ', 1)[1], stackTrace)pyspark.sql.utils.Analysis Exception: u"对未解析对象上的属性的无效调用,tree: un分辨率vedalias('shop_id,无)"

我使用的代码如下:

from pyspark.sql import SparkSession
from pyspark.sql import functions

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.debug.maxToStringFields", 100000) \
.getOrCreate()

df = spark.sql('''select distinct shop_id, item_id 
from sale.data
''')

df.selectExpr("shop_id", "item_id") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "myserver.local:443") \
.option("topic","test_topic_01") \
.save()

目前使用的版本有:

-火花2.1.1.2.6.2.0-205

-Kafka经纪人0.11

共有1个答案

狄元魁
2023-03-14

Kafka期望在其主题中写入一个键和值。尽管键不是强制性的。它通过查看应该是“键”和“值”的数据帧列的名称来做到这一点。

在查询中,您只选择了“shop_id”列,因此不存在键或值列。错误消息:“UnsolvedAlias('shop\u id,None)”告诉您,“shop\u id”列被选为键(因为它是第一列),但没有任何内容被解释为强制值。

您可以通过将列重命名为“value”来解决问题,例如:

df = spark.sql('''select distinct shop_id, item_id 
from sale.data
''')

df.withColumn("value", col("shop_id").cast(StringType)) \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "myserver.local:443") \
.option("topic","test_topic_01") \
.save()
 类似资料:
  • 我在IBM的数据平台上工作。我能够将数据加载到pyspark数据框架中,并创建了一个spark SQL表。分割数据集后,将其输入分类算法。它会出现诸如spark SQL数据无法加载之类的错误。规定的日期。 错误: TypeError:预期的序列或类似数组,已获取{ 在这个错误之后,我做了这样的事情: 错误: 属性错误回溯(最近一次调用最后一次)在()5 X_序列,y_序列,X_测试,y_测试=序列

  • 我开始构建一些非常简单的应用程序,我想我已经掌握了如何在android Studio中创建基础知识。我一辈子都搞不清楚的是为什么我不能把数据发送到我的firebase实时数据库。 这是一个非常简单的应用程序,我试图让它目前的工作(因为我不能在更丰富的一个)--只需要一个按钮,应该发送“你好,世界!”

  • 我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了以下代码: 但是,它会返回此错误: 使用 Spark 的默认 log4j 配置文件:组织/缓存/火花/log4j-defaults.属性 将默认日志级别设置为“WARN”。要调整日志记录级别,请使用 sc.setLogLevel(新级别)。对于 SparkR,请使用 setLogLevel(新级别)。18/01/08 10:

  • 我正在尝试创建一个dataframe new\u df,并使用pyspark将数据帧加载到Kafka。然而,我很少有例外。不知道到底是什么问题。任何帮助都将不胜感激。 现在我正试图将数据框架与Kafka主题联系起来 异常(从错误中选取): 完全错误: Py4JJavaError:调用o1811时出错。保存:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段76.0中的任务8

  • 我创建了一个python脚本raw\u tweets\u流。py使用twitter api流式传输twitter数据。twitter上的json数据使用下面的脚本传递给kafka producer。

  • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如