当前位置: 首页 > 知识库问答 >
问题:

如何在2.2.0中得到一个给定的Apache Spark Dataframe的Cassandra cql字符串?

农建弼
2023-03-14
TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()

看起来我可以创建一个新的TableDef,但是我必须自己完成整个映射,而且在某些情况下,像ColumnType这样的必要函数在Java中是不可访问的。例如,我试图创建一个新的ColumnDef,如下所示

new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)

目的:从Spark DataFrame中获取CQL create语句。

Input My dataframe可以有任意数量的列,这些列具有各自的Spark类型。假设我有一个有100列的Spark Dataframe,其中我的Dataframe的col8、col9对应于cassandra partitionKey列,而我的column10对应于cassandra clustering Key列

col1| col2| ...|col100
create table if not exists test.hello (
   col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
   col2 varchar,
   col3 double,
   ...
   ...
   col100 bigint,
   primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);

共有1个答案

越嘉茂
2023-03-14

因为必需的组件(PartitionKeyColumn&ColumnType)是Scala中的对象,所以需要使用以下语法来访问它们的内容:

// imports
import com.datastax.spark.connector.cql.ColumnDef;
import com.datastax.spark.connector.cql.PartitionKeyColumn$;
import com.datastax.spark.connector.types.TextType$;

// actual code
ColumnDef a = new ColumnDef("col5",  
      PartitionKeyColumn$.MODULE$, TextType$.MODULE$);

请参见ColumnRole和PrimitiveTypes的代码,以查找对象/类名称的完整列表。

更新后的额外要求:代码是冗长的,但应该工作...

SparkSession spark = SparkSession.builder()
                .appName("Java Spark SQL example").getOrCreate();

Set<String> partitionKeys = new TreeSet<String>() {{
                add("col1");
                add("col2");
        }};
Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() {{
                put("col8", 0);
                put("col9", 1);
        }};

Dataset<Row> df = spark.read().json("my-test-file.json");
TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                ProtocolVersion.NEWEST_SUPPORTED);

List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();

scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
while (iter.hasNext()) {
        ColumnDef col = iter.next();
        String colName = col.columnName();
        if (partitionKeys.contains(colName)) {
                partKeyList.add(new ColumnDef(colName, 
                                PartitionKeyColumn$.MODULE$, col.columnType()));
        } else if (clustereingKeys.containsKey(colName)) {
                int idx = clustereingKeys.get(colName);
                clusterColumnList.add(new ColumnDef(colName, 
                                new ClusteringColumn(idx), col.columnType()));
        } else {
                regColulmnList.add(new ColumnDef(colName, 
                                RegularColumn$.MODULE$, col.columnType()));
        }
}

TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                (scala.collection.Seq<ColumnDef>) partKeyList,
                (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                (scala.collection.Seq<ColumnDef>) regColulmnList,
                td.indexes(), td.isView());
String cql = newTd.cql();
System.out.println(cql);
 类似资料:
  • 问题内容: 给定一个长度 和a ,如何编写一些Java代码以产生长度为n的所有可能的字符串,其中包含集合中的字符? 对于上面的示例,结果应具有2 ^ 4 = 16个字符串,即: 这是我的代码段: 好像只是在做排列,而不是我想要的。……在此先谢谢您:) 问题答案: 以与计数相同的方式来考虑它。从技术上讲,您是从aaaa到bbbb进行“计数”,就像二进制一样。 没有看到您尝试过的内容,我不能为您提供更

  • 我有一个varchar字段,存储类似的日期 我需要将这些值更改为

  • 问题内容: 文字为: 我只想得到 我试过了 : 但这给了我: 问题答案: 正则表达式是不必要的。只需使用或即可。

  • 问题内容: 当我尝试将[] byte编组为JSON格式时,我只有一个奇怪的字符串。 请看下面的代码。 我有两个疑问: 如何将[] bytes封送至JSON? 为什么[] byte成为此字符串? 输出为: 戈朗游乐场:https : //play.golang.org/p/wanppBGzNR 问题答案: 根据文档:https : //golang.org/pkg/encoding/json/#Ma

  • 问题内容: 我正在尝试获取给定月份的第一个星期一。 我想出的最好方法是在前7天循环浏览,然后在返回时返回。有一个更好的方法吗? 问题答案: 通过查看时间的.Weekday(),您可以计算第一个星期一。

  • 问题内容: 我有两个字符串,我想检查第一个是否是另一个的子字符串。Python是否具有这样的内置功能? 问题答案: 尝试像这样使用: