当前位置: 首页 > 知识库问答 >
问题:

带有Spark Cassandra连接器的mapValues

冉伯寅
2023-03-14
sc.cassandraTable("keyspace","table")
  .select("gender","name")
  .mapValues(v => v.get())
  .lookup("Male")
error: value mapValues is not a member of com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
  • Java 1.8.0_151
  • Spark 2.2.1
  • Scala 2.11
  • 卡桑德拉3.11.1

共有1个答案

花和宜
2023-03-14

好的。我用这个方法解决了问题,使用问题中的注释:

sc.cassandraTable[(String,String)]("keyspace","table")
  .where("gender = 'Male'")
  .select("gender","name")
  .map{case (k,v) => (v,1)}
  .reduceByKey{case (v,count) => count + count}
  .collect.foreach(println)

解决方案的关键是Spark中Cassandra行和Scala类型之间的类型转换CassandRatable[(String,String)]

谢谢你。

 类似资料:
  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 设置: 我安装了Postresql(11.10版)和TimescaleDB(1.7.1版)扩展。我有2个表,我想用安装在Kafka Connect上的Debezium(ver1.3.1)连接器监视它们,目的是CDC(捕获数据更改)。 表是table1和table2hyper,但table2hyper是hypertable。 在Kafka Connect中创建Debezium连接器后,我可以看到创建

  • 我知道如何删除Kafka连接器,如此处所述 Kafka Connect - 如何删除连接器 但我不确定它是否也删除/擦除特定的连接器相关的配置,偏移量和状态从*。sorage.topic该工作者? 例如:假设我删除了一个连接器名为“connector-abc-1.0.0”的连接器,Kafka connect worker以下面的配置启动。 现在,在该连接器的DELETE调用之后,它是否会删除该特定

  • 我正在尝试将kafka-jdbc连接器(源代码和接收器)与非常旧的数据库(cloudscape)一起使用。我有这个数据库的 JDBC 驱动程序。我将驱动程序放在Confluent(版本5)的“/share/java/kafka/connect/jdbc”文件夹中,并创建了属性文件。 启动连接器时,日志如下: 我想JDBC驱动程序很旧(它使用JAVA1.3)这一事实存在问题。驱动程序使用RMI协议进

  • 我需要为REST API创建kafka源连接器,并使用头验证,如curl-H“Authorization:Basic”-H“clientID:”“https: 我还尝试使用“connector.class”:“com.tm.kafka.connect.rest.RestSourceConnector”,我的joson文件如下 但没有希望。知道如何通过身份验证获取RESTAPI数据。我的身份验证参数

  • 在阅读了Tomcat NIO连接器之后,我仍然不明白一件事:如果应用程序代码阻塞,即它阻塞从数据库读取、读取文件系统和调用外部web服务,那么NIO连接器是否有益? 例如,您有一个类似REST的API,它接收一个请求,从数据库中读取一些内容,并返回一个响应。它不使用Servlet3异步,它只是写入响应。 我没有找到NIO连接器使用的线程池的完整描述,但我认为它有一个处理请求的线程池,因此每个请求最