当前位置: 首页 > 面试题库 >

DataFrame到RDD [(String,String)]转换

熊博远
2023-03-14
问题内容

我想
在Databricks中将转换org.apache.spark.sql.DataFrameorg.apache.spark.rdd.RDD[(String, String)]
有人可以帮忙吗?

背景
(也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。

更具体地说 ,输入的类型是:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试放入Redis,如下所示:

sc.toRedisKV(lastContacts)(redisConfig)

错误消息如下所示:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

我已经玩过一些想法(例如function .rdd),但是没有一个帮助。


问题答案:

如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。

例如:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

有关 AnalysisException,
请参阅https://community.hortonworks.com/questions/106500/error-in-spark-
streaming-kafka-integration-structu.html
:必须使用writeStream.start()执行带有流源的查询

您需要等待使用查询终止查询。 awaitTermination() 防止查询活动时退出进程。



 类似资料:
  • 我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。

  • 我在pandas中有一个数据帧,它混合了int和str数据列。我想先连接dataframe中的列。为此,我必须将列转换为。我试着做了如下工作: 或 但在这两种情况下,它都不起作用,我得到一个错误,说“不能连接'str'和'int'对象”。将两个列串联在一起可以很好地工作。

  • 我试图使用以下公式将Future[Seq[(String,String)]转换为Future[Seq[(String)]: 所以 sortedSeq 是 Future[Seq[(String, String)]] 但我一直得到错误: 我做错了什么?

  • 问题内容: 我正在从String变量的Restful api获取数据,现在我想转换为JSON对象,但是在转换时会引发异常,这是我的问题。这是我的代码: 我的字符串包含 这是我想要在json中使用的字符串,但它向我显示了线程“ main”中的异常 问题答案: 的是本内的。您需要按层次分析JSON,以便能够正确获取数据。 注意:本示例使用类而不是。 正如“ Matthew”在他正在使用的注释中提到的那

  • 现在这是一个已经经历了很多堆栈溢出的线程。这也被其他网站所覆盖,但我仍然无法完全理解。我从这个网站上读到了不同的东西,我在底部链接它们,但现在是问题。 像往常一样,我尝试从文件中读取行: 我不明白为什么我没有得到一个IO[字符串]- 有没有办法在类型上耍花招,可以访问列表中的每个元素,然后创建一个新的纯元素 或者,我仍然可以使用类型执行基本的列表操作(但是到目前为止,我的操作失败了) 我读过的一些

  • 问题内容: 我正在尝试将Spark RDD转换为DataFrame。我已经看到了将方案传递给函数的文档和示例 。 但是我有38列或字段,并且这将进一步增加。如果我手动给出指定每个字段信息的架构,那将是一件非常繁琐的工作。 还有其他方法可以指定模式,而无需事先了解各列的信息。 问题答案: 看到, 在Spark中有两种将RDD转换为DF的方法。 和 我将向您展示如何动态地做到这一点。 toDF() 该