当前位置: 首页 > 知识库问答 >
问题:

scala将CassandRatable结果连接到dataframe

巴星华
2023-03-14

我正在使用Datastax spark-Cassandra-connector访问cassandra中的一些数据。我的要求是将RDD与Cassandra表连接起来,获取结果并将其存储在配置单元表中。

com.datastax.spark.connector.rdd.CassandraJoinRDD[org.apache.spark.sql.Row, 
com.datastax.spark.connector.CassandraRow] = 
CassandraJoinRDD[17] at RDD at CassandraRDD.scala:19
val data=joinWithRDD.map{
   case(_, cassandraRow) =>    Row(cassandraRow.columnValues:_*)
}

sqlContext.createDataFrame(data,schema)

我正在犯错误

java.lang.ClassCastException: cannot assign instance of
   scala.collection.immutable.List$SerializationProxy to field 
   org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of 
   type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

你能帮我把JoinWithCassandRatable转换成数据文件吗?

共有1个答案

苏洛城
2023-03-14

正如我所看到的,您在联接的左侧使用了dataframe。与其使用使用RDD API的JoinWithCassandRatable,我建议使用支持Dataframe API中join的Spark Cassandra连接器2.5.x(2.5.1是最新版本),并直接使用它。这非常简单,您只需使用--conf spark.sql.extensions=com.datastax.spark.connector.cassandrasParkExtensions启动您的作业就可以激活此功能,之后,代码只需在Dataframes上使用普通联接:

val parsed = ...some dataframe...
val cassandra = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "stock_info", "keyspace" -> "test"))
  .load

// we can use left join to detect what data is incorrect - if we don't have some data in the
// Cassandra, then symbol field will be null, so we can detect such entries, and do something with that
// we can omit the joinType parameter, in that case, we'll process only data that are in the Cassandra
val joined = parsed.join(cassandra, cassandra("symbol") === parsed("ticker"), "left")
   .drop("ticker")

这里有自述文件的完整源代码。

 类似资料:
  • 我开始使用JOOQ和dvd租赁商店数据库aka sakila。基本上,我想找一个演员和他的角色(我创建的表)。到目前为止,我想到了这个: 我希望能够将演员及其角色提取到一个对象中:DTO。我找到了这篇文章https://arnaudroger.github.io/blog/2017/03/02/jooq-one-to-many-without-dto.html但我发现使用sfm的解决方案过于冗长,

  • 问题内容: 我和用户与GameMap之间存在一对多的关系。一个用户可以拥有许多地图。 用户类别: 但是,有时我需要急于加载地图。为了避免在关闭Session后出现 LazyInitializationException ,我有两种检索Users的方法。 用户存储库: 问题: 但是, 如果表中没有该用户的映射 ,那么JPQL JOIN FETCH变体希望一次加载该用户,并且他的映射返回NULL用户。

  • 我想知道在哪里提到表的列名,这是连接的键。

  • 问题内容: 我和用户与GameMap之间存在一对多的关系。一个用户可以拥有许多地图。 用户类别: 但是,有时我需要急于加载地图。为了避免在关闭Session后出现 LazyInitializationException ,我有两种检索Users的方法。 用户存储库: 问题: 但是, 如果表中没有该用户的映射 ,那么JPQL JOIN FETCH变体希望一次加载该用户,并且他的映射返回NULL用户。

  • 因此,当我从本地计算机执行时,它无法连接到: 但是,我知道如果我将主程序设置为,它就会起作用,因为这样它就会在本地运行。但是,我想让我的客户端连接到这个远程主机。我怎么才能做到?Apache配置看起来是文件。我甚至可以远程登录到该公共DNS和端口,我还为每个实例配置了公共DNS和主机名。我希望能够向这个远程主机提交作业,我错过了什么?

  • 问题内容: 我在AWS中创建了一个3节点(1个主节点,2个工作人员)集群。我可以将作业从主服务器提交到群集,但是我无法使其在远程工作。 我可以从主人那里看到: 因此,当我从本地计算机执行时,它无法连接到: 但是,我知道如果将master设置为,它会起作用,因为那样它将在本地运行。但是,我希望客户端连接到该远程主服务器。我该怎么做?Apache配置外观文件。我什至可以远程登录到该公共DNS和端口,还