当前位置: 首页 > 知识库问答 >
问题:

如何使用Datastax连接器从Spark Dataframe更新特定的Cassandra列集

夏侯阳
2023-03-14

我有一个只有几个列的Cassandra表,我想更新其中的一个(对于多个列,还有什么?)来自Spark 2.4.0。但是如果我没有提供所有的列,那么记录就不会得到更新。

Cassandra模式:

rowkey,message,number,timestamp,name
1,hello,12345,12233454,ABC
finalDF.select("rowkey","current_ts")
  .withColumnRenamed("current_ts","timestamp")
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_data", "keyspace" -> "ks_data"))
  .mode("overwrite")
  .option("confirm.truncate","true")
  .save()
finalDF=
rowkey,current_ts
1,12233999
rowkey,message,number,timestamp,name
1,hello,12345,12233999,ABC

共有1个答案

慕兴平
2023-03-14

说明是savemode用于指定将DataFrame保存到数据源的预期行为。(不仅适用于C*,而且适用于任何数据源)。可用的选项有

  1. savemode.errorifexists
  2. savemode.append
  3. savemode.overwrite
  4. 保存。忽略

在本例中,由于您已经有了数据,并且希望追加数据,因此必须使用savemode.append

import org.apache.spark.sql.SaveMode

finalDF.select("rowkey","current_ts")
  .withColumnRenamed("current_ts","timestamp")
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_data", "keyspace" -> "ks_data"))
  .mode(SaveMode.Append)
  .option("confirm.truncate","true")
  .save()

请参阅SaveModes上的spark文档

 类似资料:
  • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这

  • 我刚开始用cassandra和java以及spring REST库进行实验。 这里是pom.xml

  • 我已经启动了spark-thrift服务器,并使用Beeline连接到thrift服务器。当尝试查询时,创建一个表在hive转移,我得到以下错误。 cassandra不是有效的Spark SQL数据源。 0:jdbc:hive2:/localhost:10000>select*from traveldata.employee_details;

  • 问题内容: 我正在尝试更新所有以’agg%’和column_name=’%userid%’之类的字符串开头的表…但是即使我能够找到选择具有特定列的所有表的选项,我也看不到在线的此类示例名称和表名称我需要执行相同的操作来更新这些表,如下所示: 帮助将不胜感激。 谢谢。 问题答案: 获取您条件的更新查询 执行

  • 我试图理解Datastax Cassandra驱动程序中的连池,因此我可以更好地在我的Web服务中使用它。 我有留档的1.0版。它说: Java驱动程序异步使用连接,因此可以在同一个连接上同时提交多个请求。 他们通过连接理解什么?当连接到集群时,我们有:一个生成器、一个集群和一个会话。他们中的哪一个是连接? 例如,有一个参数: MaxSimultaneousRecestsPerConnection