当前位置: 首页 > 知识库问答 >
问题:

使用Spark 2.0.2、Kafka源和calapb进行结构化流

傅乐湛
2023-03-14

我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。

线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apache.spark.sql.catalyst.符号$SymbolContextApiImpl.as术语(Symbols.scala:84)org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811)org.apache.spark.sql.catalyst.ScalaReflection$.构造参数(ScalaReflection.scala:39)org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800)org.apache.spark.sql.catalyst.ScalaReflection$. getConstructorParameters(ScalaReflection.scala:39)ScalaReflection$. org$apache$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection.scala:582)org.apache.spark.sql.catalyst.ScalaReflection$. org$apache$火花$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection.scala:460)scala.reflect.api.ScalaReflection$$anonfun$class.as(ScalaRymbols.scala:199)scala.reflect.internal.ScalaRScalaReflection$. org$apache$火花$sql$催化剂$ScalaReflection$$序列化器对于(ScalaReflection. scala: 583)在org. apache. sql.催化剂。ScalaReflection$.序列化器对于(ScalaReflection. scala: 425)在org. apache. sql.催化剂.编码器。ExpressionEncoder美元。在org. apache. sql应用(ExpressionEncoder. scala: 61)。编码器美元。产品(Encoders. scala: 274)在org. apache. spark. sql。SQLEncoders. newProductEncoder(SQLIncates. scala: 47)在sun.反射的人消费者$. main(人消费者. scala: 33)在sun.反射的人消费者. main(人消费者. scala)。NativemetodAccessorInm.调用0(本机方法)在sun.反射。N

以下是我的代码...

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    def parseLine(s: String): Person =
      Person.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

    val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

    val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

    val ds4 = spark.sqlContext.sql("select name from persons")

    val query = ds4.writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

共有2个答案

公良英资
2023-03-14

在Person类中,性别是一个枚举,这就是导致此问题的原因。删除此字段后,它工作正常。以下是我从DataBricks的Shixiong(Ryan)那里得到的答案。

问题是“可选的性别性别=3;”。生成的类“性别”是一个特征,Spark不知道如何创建一个特征,因此不支持它。您可以定义SQL编码器支持的类,并将此生成的类转换为parseLine中的新类。

蓬高谊
2023-03-14

带val ds3的行应为:

val ds3 = ds2.map(str => parseLine(str))

sqlContext.protoToDataFrame(ds3).registerTempTable("persons")

在将RDD保存为临时表之前,需要将其转换为数据帧。

 类似资料:
  • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

  • 批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)

  • 我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我试图使用Spark,更具体地说是PySpark和结构化流来消费Kafka。 PY4JJavaError:调用O70时出错。AwaitTermination

  • 我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。 mobEventDF有这样一个模式 有没有更好的方法?如何将其直接映射到下面的Scala Case类中?