我使用火花结构流从Kafka消费,并使用Foreach插入Datastax Cassandra。当我插入BigInt和String时,它会插入,但当我插入Double值时,它会抛出“未找到请求操作的编解码器:[varchar
val view_a = VW_MS_PLAN_UNIT_LA
.writeStream
.option(WriteConf.IgnoreNullsParam.name, "true")
.queryName("VIEW PLAN UNIT LA")
.outputMode("Append")
.foreach(new CassandraSinkForeach)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
df.show()
Spark(PrintSchema)a-String b-Bigint C-Double的示例数据帧
示例Cassandra表;-创建表a(a字符串,b bigint,c双精度)
var cassandraDriver: CassandraDriver = null;
var preparedStatement: PreparedStatement = null;
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"Open connection")
cassandraDriver = new CassandraDriver();
preparedStatement = cassandraDriver.connector.withSessionDo(session =>
session.prepare(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink}
(a, b, c) values(?, ?, ?)""")
true
}
def process(record: org.apache.spark.sql.Row) = {
println(s"Process new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(preparedStatement.bind(${record.getAs[String](0)},
${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
)
}
com.datastax.driver.core.exceptions.CodecNotFoundException:未找到用于请求操作的编解码器:[varchar
再次查看消息后-您的数据与表结构不匹配。只需添加显式转换...
此外,要在DataTax Java驱动程序中使用Scala类型,您可以从Java驱动程序Scala extras存储库中获取编解码器。不幸的是,它并没有“官方”的jar构建,所以您要么需要自己编译和部署代码,要么只需要在项目中包含一些代码。DataTax开发博客上有一篇博文解释了它是如何实现的。
我正在使用以下带有java8的datastax版本 我的表有一个日期列,如下所示 当我试图将数据保存到C*表中时,如下所示 获取错误: 我在做什么?怎么解决这个?有什么需要帮忙的吗...
我正在使用Cassandra-Driver-Core-3.0.0-1。
我的实体有一个字段 当我通过Spring Data repository查询该对象时,我得到了一个异常 CodeNotFoundException:未找到请求操作的编解码器:[set<->java.util.set] 可能是带有删除类型的东西。如何以最小的努力来解决这个问题?
我刚从Spark开始。我已经用Spark安装了CDH5。然而,当我尝试使用sparkcontext时,它给出了如下错误 我对此进行了研究,发现了错误:未找到:值sc 并试图启动火花上下文。/Spark-shell。它给错误
我有一个简单的类叫做Signal。课程内容如下: 我试图在MongoDB(v3.4)插入信号。我使用以下方法插入: 我得到了以下例外: org.bson.codecs.configuration.代码配置异常:找不到in.co.mysite.webapi.models.Signal类的编解码器。 我在这里检查了一个类似的问题,但是插入代码不同。我从回答中得到提示,修改了我的方法,但看起来不干净。修