当前位置: 首页 > 知识库问答 >
问题:

使用pyspark将数据帧写入Kafka时的异常

阎功
2023-03-14

我正在尝试创建一个dataframe new\u df,并使用pyspark将数据帧加载到Kafka。然而,我很少有例外。不知道到底是什么问题。任何帮助都将不胜感激。

>>> dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
>>> df = spark.createDataFrame(dict)

>>> import time
>>> import datetime
>>> from pyspark.streaming.kafka import KafkaUtils
>>> timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
>>> type(timestamp)
<class 'str'>

>>> from pyspark.sql.functions import lit,unix_timestamp
>>> timestamp
'2017-08-02 16:16:14'
>>> new_df = df.withColumn('time',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
>>> new_df.show(truncate = False)
+---+-----+---------------------+
|age|name |time                 |
+---+-----+---------------------+
|1  |Alice|2017-08-02 16:16:14.0|
|2  |Again|2017-08-02 16:16:14.0|
+---+-----+---------------------+

现在我正试图将数据框架与Kafka主题联系起来

def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc:9092") \
                .option("topic", "test_topic") \
                .save()



writeToKafka(new_df)

异常(从错误中选取):

org.apache.spark.SparkException: Job aborted due to stage failure: 
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

完全错误:

Py4JJavaError:调用o1811时出错。保存:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段76.0中的任务8失败1次,最近的失败:阶段76.0中的任务8.0丢失(TID 110,localhost,executor driver):org。阿帕奇。Kafka。常见的KafkaException:无法在组织上构造kafka producer。阿帕奇。Kafka。客户。制作人Kafka制作人。(KafkaProducer.java:432)网址:org。阿帕奇。Kafka。客户。制作人Kafka制作人。(KafkaProducer.java:270)位于org。阿帕奇。火花sql。Kafka010。CachedKafkaProducer美元。org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(CachedKafkaProducer.scala:67)位于org。阿帕奇。火花sql。Kafka010。CachedKafkaProducer$$不超过1美元。在组织中加载(CachedKafkaProducer.scala:46)。阿帕奇。火花sql。Kafka010。CachedKafkaProducer$$不超过1美元。在组织中加载(CachedKafkaProducer.scala:43)。spark\u项目。番石榴隐藏物LocalCache$LoadingValueReference。loadFuture(LocalCache.java:3599)位于org。spark\u项目。番石榴隐藏物LocalCache$段。位于org的loadSync(LocalCache.java:2379)。spark\u项目。番石榴隐藏物LocalCache$段。位于org的lockedGetOrLoad(LocalCache.java:2342)。spark\u项目。番石榴隐藏物LocalCache$段。在org上获取(LocalCache.java:2257)。spark\u项目。番石榴隐藏物本地缓存。在org上获取(LocalCache.java:4000)。spark\u项目。番石榴隐藏物本地缓存。位于org的getOrLoad(LocalCache.java:4004)。spark\u项目。番石榴隐藏物LocalCache$LocalLoadingCache。在org上获取(LocalCache.java:4874)。阿帕奇。火花sql。Kafka010。CachedKafkaProducer美元。getOrCreate(CachedKafkaProducer.scala:80)位于org。阿帕奇。火花sql。Kafka010。KafkaWriteTask。在组织中执行(KafkaWriteTask.scala:44)。阿帕奇。火花sql。Kafka010。KafkaWriter$$anonfun$写入$1$$anonfun$应用$1。在组织上应用$mcV$sp(KafkaWriter.scala:89)。阿帕奇。火花sql。Kafka010。KafkaWriter$$anonfun$写入$1$$anonfun$应用$1。在org上应用(KafkaWriter.scala:89)。阿帕奇。火花sql。Kafka010。KafkaWriter$$anonfun$写入$1$$anonfun$应用$1。在org上应用(KafkaWriter.scala:89)。阿帕奇。火花util。Utils美元。tryWithSafeFinally(Utils.scala:1360)位于org。阿帕奇。火花sql。Kafka010。KafkaWriter$$anonfun$写入$1。在org上应用(KafkaWriter.scala:89)。阿帕奇。火花sql。Kafka010。KafkaWriter$$anonfun$写入$1。在org上应用(KafkaWriter.scala:87)。阿帕奇。火花rdd。RDD$$anonfun$foreachPartition$1$$anonfun$apply$28。在组织中应用(RDD.scala:980)。阿帕奇。火花rdd。RDD$$anonfun$foreachPartition$1$$anonfun$apply$28。在组织中应用(RDD.scala:980)。阿帕奇。火花SparkContext$$anonfun$runJob$5。在组织中应用(SparkContext.scala:2101)。阿帕奇。火花SparkContext$$anonfun$runJob$5。在组织中应用(SparkContext.scala:2101)。阿帕奇。火花调度程序。结果任务。在组织上运行任务(ResultTask.scala:90)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:123)。阿帕奇。火花执行人。执行人$TaskRunner$anonfun$10。在组织中应用(Executor.scala:408)。阿帕奇。火花util。Utils美元。tryWithSafeFinally(Utils.scala:1360)位于org。阿帕奇。火花执行人。执行者$TaskRunner。在java上运行(Executor.scala:414)。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1149)。util。同时发生的ThreadPoolExecutor$工作者。在java上运行(ThreadPoolExecutor.java:624)。lang.Thread。运行(Thread.java:748)由:org引起。阿帕奇。Kafka。常见的配置。ConfigException:引导中未提供可解析的引导URL。组织中的服务器。阿帕奇。Kafka。客户。客户端。org上的parseandvalidateaddress(ClientUtils.java:88)。阿帕奇。Kafka。客户。客户端。org上的parseandvalidateaddress(ClientUtils.java:47)。阿帕奇。Kafka。客户。制作人Kafka制作人。(KafkaProducer.java:407)。。。31个以上

我有3个Kafka经纪人,3个Kafka动物园管理员,托管在库伯内特斯集群。

共有1个答案

赏彭薄
2023-03-14
def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc.test_namespace:9092") \
                .option("topic", "test_topic") \
                .save()

kafka代理位于kubernetes集群中的另一个命名空间上。而且,我的jupyter笔记本位于另一个命名空间上。

一旦我尝试使用kafka_service.namespace: portno作为kafka.bootstrap.servers(即kafka-svc.test_namespace: 9092),它就成功了

kafka-svc - is the kafka service name.
test_namespace - is the name of the namespace where kafka brokers are hosted
 类似资料:
  • 我想将一个数据帧保存到两个不同的csv文件中(拆分数据帧)-一个文件只包含标题,另一个文件包含其余行。 我想将这两个文件保存在同一个目录下,这样Spark处理所有逻辑将是最好的选择,如果可能的话,而不是使用pandas分割csv文件。 最有效的方法是什么? 谢谢你的帮助!

  • 问题内容: 我跟随本文将一些数据发送到AWS ES,并使用了jar elasticsearch-hadoop。这是我的脚本: 然后运行以下命令行: 其中write_to_es.py是上面的脚本。 这是我得到的错误: 如何解决这个问题: 任何帮助或建议,我们将不胜感激。 问题答案: 我有同样的问题。 看完这篇文章,我找到了答案!!! 您必须像这样转换为Type:

  • 我有一个名为df的数据库数据帧。我想将它作为csv文件写入S3存储桶。我有S3存储桶名称和其他凭据。我检查了这里给出的在线留档https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html#mount-aws-s3它说使用以下命令 但我有的是数据帧,而不是文件。怎么才能实现?

  • 显然它无法解码数据。有什么想法吗?

  • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr

  • 我正在尝试使用Databricks的spark-csv2.10依赖关系将一个数据帧写入到HDFS的*.csv文件。依赖关系似乎可以正常工作,因为我可以将.csv文件读入数据帧。但是当我执行写操作时,我会得到以下错误。将头写入文件后会出现异常。 当我将查询更改为时,write工作很好。 有谁能帮我一下吗? 编辑:根据Chandan的请求,这里是的结果