我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。
线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apache.spark.sql.catalyst.符号$SymbolContextApiImpl.as术语(Symbols.scala:84)org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811)org.apache.spark.sql.catalyst.ScalaReflection$.构造参数(ScalaReflection.scala:39)org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800)org.apache.spark.sql.catalyst.ScalaReflection$. getConstructorParameters(ScalaReflection.scala:39)ScalaReflection$. org$apache$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection.scala:582)org.apache.spark.sql.catalyst.ScalaReflection$. org$apache$火花$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection.scala:460)scala.reflect.api.ScalaReflection$$anonfun$class.as(ScalaRymbols.scala:199)scala.reflect.internal.ScalaRScalaReflection$. org$apache$火花$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection. scala: 583)在org. apache. sql.催化剂。ScalaReflection$.序列化器对于(ScalaReflection. scala: 425)在org. apache. sql.催化剂.编码器。ExpressionEncoder美元。在org. apache. sql应用(ExpressionEncoder. scala: 61)。编码器美元。产品(Encoders. scala: 274)在org. apache. spark. sql。SQLEncoders. newProductEncoder(SQLIncates. scala: 47)在sun.反射的人消费者$. main(人消费者. scala: 33)在sun.反射的人消费者. main(人消费者. scala)。NativemetodAccessorInm.调用0(本机方法)在sun.反射。N
以下是我的代码...
object PersonConsumer {
import org.apache.spark.rdd.RDD
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
def parseLine(s: String): Person =
Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
val ds4 = spark.sqlContext.sql("select name from persons")
val query = ds4.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
在Person类中,性别是一个枚举,这就是导致此问题的原因。删除此字段后,它工作正常。以下是我从DataBricks的Shixiong(Ryan)那里得到的答案。
问题是“可选的性别性别=3;”。生成的类“性别”是一个特征,Spark不知道如何创建一个特征,因此不支持它。您可以定义SQL编码器支持的类,并将此生成的类转换为parseLine
中的新类。
带val ds3的行应为:
val ds3 = ds2.map(str => parseLine(str))
sqlContext.protoToDataFrame(ds3).registerTempTable("persons")
在将RDD保存为临时表之前,需要将其转换为数据帧。
我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚
批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)
我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s
我试图使用Spark,更具体地说是PySpark和结构化流来消费Kafka。 PY4JJavaError:调用O70时出错。AwaitTermination
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\
我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。 mobEventDF有这样一个模式 有没有更好的方法?如何将其直接映射到下面的Scala Case类中?