当前位置: 首页 > 知识库问答 >
问题:

未找到请求操作的编解码器:[varcharjava.lang.Double],从火花结构插入到Datastax cassandra

李博达
2023-03-14

我使用火花结构流从Kafka消费,并使用Foreach插入Datastax Cassandra。当我插入BigInt和String时,它会插入,但当我插入Double值时,它会抛出“未找到请求操作的编解码器:[varchar

val view_a = VW_MS_PLAN_UNIT_LA
      .writeStream
      .option(WriteConf.IgnoreNullsParam.name, "true")
      .queryName("VIEW PLAN UNIT LA")
      .outputMode("Append")
      .foreach(new CassandraSinkForeach)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()


df.show()

Spark(PrintSchema)a-String b-Bigint C-Double的示例数据帧

示例Cassandra表;-创建表a(a字符串,b bigint,c双精度)

 var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (a, b, c) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getAs[String](0)}, 
           ${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
    )
  }

com.datastax.driver.core.exceptions.CodecNotFoundException:未找到用于请求操作的编解码器:[varchar

共有1个答案

微生宝
2023-03-14

再次查看消息后-您的数据与表结构不匹配。只需添加显式转换...

此外,要在DataTax Java驱动程序中使用Scala类型,您可以从Java驱动程序Scala extras存储库中获取编解码器。不幸的是,它并没有“官方”的jar构建,所以您要么需要自己编译和部署代码,要么只需要在项目中包含一些代码。DataTax开发博客上有一篇博文解释了它是如何实现的。

 类似资料: