当前位置: 首页 > 知识库问答 >
问题:

火花流Kafka消费者(Avro)-属性错误:“判决”对象没有属性“分裂”

崔高远
2023-03-14

我正在尝试构建一个Spark流媒体应用程序,该应用程序使用来自Kafka主题的消息,并使用Avro格式的消息,但我在使用合流消息反序列化程序时遇到了一些问题。

按照Spark Python Avro Kafka Deserialiser的说明,我让Kafka消费者正确地反序列化消息,但最终未能运行PythonStreamingDirectKafkaWordCount示例。

代码:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://127.0.0.1:8081')
serializer = MessageSerializer(schema_registry_client)

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)
    sc.setLogLevel("WARN")
    kvs = KafkaUtils.createDirectStream(ssc, ["avrotest2"], {"metadata.broker.list": "localhost:9092"}, valueDecoder=serializer.decode_message)
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

火花提交CLI

/opt/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars spark-streaming-kafka-0-8_2.11-2.0.0.jar smartbus-stream-app_avro2.py

线pprint()输出:

{u'temperature': 21.0, u'max_capacity': 44, u'equip_type': u'Autocarro', u'id': u'CARRIS_502', u'tire_pressure': 4.974999904632568, u'humidity': 21.0, u'equip_category': u'Carruagem_Unica', u'users_out': 2.0, u'equip_brand': u'Volvo', u'battery_status': 99.5, u'equip_fuel': u'Biodiesel', u'fuel': 39.79999923706055, u'equip_model': u'3d', u'aqi_sensor': 5.0, u'seated_capacity': 32, u'users_in': 3.0, u'location': u'38.760780, -9.166853'}

StackTrace输出:

2018-03-17 03:47:06 ERROR Executor:91 - Exception in task 0.0 in stage 36.0 (TID 29)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1857, in combineLocally
  File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "/root/ss_app/smartbus-stream-app_avro2.py", line 17, in <lambda>
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
AttributeError: 'dict' object has no attribute 'split'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

关于这件事,我真的找不到更多的细节。有哪位专家能对这个问题有所了解吗?

提前感谢

共有1个答案

沃威
2023-03-14

它抛出这个错误是因为你试图拆分听写器。在平面图中,每一行都是字典。所以当你尝试拆分时,它会抛出这个错误dict has no attribute split。

 类似资料:
  • 我试图使用NetworkX来读取Shapefile,并使用函数来生成包含节点和边的Shapefile,但是当我试图运行代码时,它会给我以下错误: 我使用的是Python 3.4,并通过pip安装安装了NetworkX。 在这个错误之前,它已经给了我另一个说“xrange不存在”或类似的东西,所以我查了一下,只是在nx_shp.py文件中将更改为,这似乎去解决它。 据我所知,它可能与Python版本

  • 问题内容: 我正在尝试读取文件,并用逗号在每行中拆分一个单元格,然后仅显示包含有关纬度和经度信息的第一和第二个单元格。这是文件: 时间, 纬度,经度 ,类型2015-03-20T10:20:35.890Z, 38.8221664,-122.7649994 ,地震 2015-03-20T10 :18:13.070Z, 33.2073333,-116.6891667 ,地震 2015-03-20T10

  • 我试图分裂链接的图像是什么错在我的代码

  • 当我执行代码时,我得到一个错误, 属性错误:“WebDriver”对象没有属性“find_element_by_xpath”

  • 我在Jupyter Notebook中运行Keras神经网络模型(Python 3.6) 我得到以下错误 属性错误:列表对象没有属性ndim 从K调用. fi()方法后eras.model 我检查了Keras的requirements.txt文件(在Anaconda3中),Numpy、smpy和六个模块版本都是最新的。 什么可以解释这个属性错误? 完整的错误消息如下(似乎与Numpy有些关联):

  • 我使用要连接到mysql,下面是我的Python语句: 但是有一个错误,这里是日志: 这是我的代码: 我已经创建数据库在谢谢