我正在尝试构建一个Spark流媒体应用程序,该应用程序使用来自Kafka主题的消息,并使用Avro格式的消息,但我在使用合流消息反序列化程序时遇到了一些问题。
按照Spark Python Avro Kafka Deserialiser的说明,我让Kafka消费者正确地反序列化消息,但最终未能运行PythonStreamingDirectKafkaWordCount示例。
代码:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://127.0.0.1:8081')
serializer = MessageSerializer(schema_registry_client)
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
sc.setLogLevel("WARN")
kvs = KafkaUtils.createDirectStream(ssc, ["avrotest2"], {"metadata.broker.list": "localhost:9092"}, valueDecoder=serializer.decode_message)
lines = kvs.map(lambda x: x[1])
lines.pprint()
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
火花提交CLI
/opt/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars spark-streaming-kafka-0-8_2.11-2.0.0.jar smartbus-stream-app_avro2.py
线pprint()输出:
{u'temperature': 21.0, u'max_capacity': 44, u'equip_type': u'Autocarro', u'id': u'CARRIS_502', u'tire_pressure': 4.974999904632568, u'humidity': 21.0, u'equip_category': u'Carruagem_Unica', u'users_out': 2.0, u'equip_brand': u'Volvo', u'battery_status': 99.5, u'equip_fuel': u'Biodiesel', u'fuel': 39.79999923706055, u'equip_model': u'3d', u'aqi_sensor': 5.0, u'seated_capacity': 32, u'users_in': 3.0, u'location': u'38.760780, -9.166853'}
StackTrace输出:
2018-03-17 03:47:06 ERROR Executor:91 - Exception in task 0.0 in stage 36.0 (TID 29)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1857, in combineLocally
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "/root/ss_app/smartbus-stream-app_avro2.py", line 17, in <lambda>
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
AttributeError: 'dict' object has no attribute 'split'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
关于这件事,我真的找不到更多的细节。有哪位专家能对这个问题有所了解吗?
提前感谢
它抛出这个错误是因为你试图拆分听写器。在平面图中,每一行都是字典。所以当你尝试拆分时,它会抛出这个错误dict has no attribute split。
我试图使用NetworkX来读取Shapefile,并使用函数来生成包含节点和边的Shapefile,但是当我试图运行代码时,它会给我以下错误: 我使用的是Python 3.4,并通过pip安装安装了NetworkX。 在这个错误之前,它已经给了我另一个说“xrange不存在”或类似的东西,所以我查了一下,只是在nx_shp.py文件中将更改为,这似乎去解决它。 据我所知,它可能与Python版本
问题内容: 我正在尝试读取文件,并用逗号在每行中拆分一个单元格,然后仅显示包含有关纬度和经度信息的第一和第二个单元格。这是文件: 时间, 纬度,经度 ,类型2015-03-20T10:20:35.890Z, 38.8221664,-122.7649994 ,地震 2015-03-20T10 :18:13.070Z, 33.2073333,-116.6891667 ,地震 2015-03-20T10
我试图分裂链接的图像是什么错在我的代码
当我执行代码时,我得到一个错误, 属性错误:“WebDriver”对象没有属性“find_element_by_xpath”
我在Jupyter Notebook中运行Keras神经网络模型(Python 3.6) 我得到以下错误 属性错误:列表对象没有属性ndim 从K调用. fi()方法后eras.model 我检查了Keras的requirements.txt文件(在Anaconda3中),Numpy、smpy和六个模块版本都是最新的。 什么可以解释这个属性错误? 完整的错误消息如下(似乎与Numpy有些关联):
我使用要连接到mysql,下面是我的Python语句: 但是有一个错误,这里是日志: 这是我的代码: 我已经创建数据库在谢谢