我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。
spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\sparktest\structured_streaming.py
这是我从Spark github上取的代码
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
你的方式是正确的,但不幸的是,Kafka0.10还不支持PySpark。正如你在火星-16534中所看到的。
到目前为止,对pySpark的唯一支持是Kafka0.8。因此,您可以迁移到Spark0.8或将代码更改为Scala。
我有一个用于结构化流媒体的Kafka和Spark应用程序。特别是我的KafkaProducer具有以下配置: 然后我创建了一个ProducerRecord,如下所示: 其中,json。toString()表示一个JSON格式的字符串,这是我想在Spark中处理的值。现在,我主要做的是将Spark与Kafka主题联系起来,正如官方Spark结构化流媒体指南中所报道的那样: 然后 我有以下输出和异常:
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我有一个关于Kafka流上的Spark结构化流的问题。 我有一个模式类型: 我从Kafka主题引导我的流,如下所示: 接下来转换为字符串,字符串类型: 现在我想将value字段(这是一个JSON)转换为之前转换的模式,这将使SQL查询更容易: 看来Spark 2.3.1不知道函数? 这是我的进口: 有没有办法解决这个问题?请注意,我不是在寻找Scala解决方案,而是一个纯粹的基于Java的解决方案
批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)
我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我