当前位置: 首页 > 知识库问答 >
问题:

在Apache spark中合并两个列名不同的数据集

林英锐
2023-03-14
"405-048011-62815", "CRC Industries",

"630-0746","Dixon value",

"4444-444","3M INdustries",

"555-55","Dixon coupling valve"

输入数据集2

"222-2222-5555", "Tata",

"7777-88886","WestSide",

"22222-22224","Reliance",

"33333-3333","V industries"

预期输出为

    ----------label1----|------sentence1------|------label2---|------sentence2-----------
    | 405-048011-62815  | CRC Industries      | 222-2222-5555 |                      Tata|
    |        630-0746   |   Dixon value       |   7777-88886  |                  WestSide|
    -------------------------------------------------------------------------------------

`

    List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

    StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<String> listStrings = new ArrayList<String>();
    listStrings.add("405-048011-62815");
    listStrings.add("630-0746");

    Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
    matchFound1.show();
    listStrings.clear();
    listStrings.add("222-2222-5555");
    listStrings.add("7777-88886");

    List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));

    StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
    new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });

    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

    Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
    matchFound2.show();

    //Approach 1
    Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2"));
    System.out.println("After concat");
    matchFound3.show();

    //Approach 2
    Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2")));
    System.out.println("After concat 2");
    matchFound4.show();`
----------
org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];;
!Project [label1#0, sentence1#1, label2#10, sentence2#11]
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]


----------
Error for each of the approaches are as follows
Approach 2 error
org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;;
!Filter concat(label1#0, sentence1#1, label2#10, sentence2#11)
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]

共有1个答案

长孙明知
2023-03-14

希望这对你有用

DF

val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve")
        val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55")
        val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1")

        val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries")
         val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333")
        val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2")

字符串索引器

val indexer = new StringIndexer()
  .setInputCol("label1")
  .setOutputCol("label1Index")

val indexed = indexer.fit(df1).transform(df1)
indexed.show()

val indexer1 = new StringIndexer()
  .setInputCol("label2")
  .setOutputCol("label2Index")

val indexed1 = indexer1.fit(df2).transform(df2)
indexed1.show()
    val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index"))
rnd_reslt12.show()

+---------------+--------------------+-------------+------------+
|         label1|           sentence1|       label2|   sentence2|
+---------------+--------------------+-------------+------------+
|       630-0746|         Dixon value|222-2222-5555|        Tata|
|       4444-444|       3M INdustries|  22222-22224|    Reliance|
|         555-55|Dixon coupling valve|   33333-3333|V industries|
|405048011-62815|      CRC Industries|   7777-88886|    WestSide|
+---------------+--------------------+-------------+------------+
 类似资料:
  • 问题内容: 我可以在其他具有相同列名的数据框的右边追加一个数据框吗 问题答案: 您可以像这样连接两个数据框。 如果您正在寻找联盟,则可以执行以下操作。 Spark 2.0,已重命名为

  • 问题内容: 我肯定在这里错过了一些简单的事情。尝试在熊猫中合并具有相同列名的两个数据框,但右侧的数据框具有一些左侧没有的列,反之亦然。 我试着加入外部联接: 但这产生了: 我还指定了一个要连接的单列(例如on =“ id”),但是它复制了除“ id”以外的所有列,例如attr_1_x,attr_1_y,这并不理想。我也将整个列列表(有很多)传递给了“ on”: 产生: 我想念什么?我想获得一个带有

  • 我有两个熊猫数据帧共享一个共同的列名。我想合并公共列名,但保留与第二个dataFrame中的所有不同列,其中公共列名称匹配。下面是两个数据帧的示例: 我希望预期的结果是: 也就是说,当列“A”匹配时,我希望保留I,J,K,L的行,并且不等于“NaN ”,对于DF1中的列也是如此。 我已经尝试了所有的pd.merge选项,但是它们似乎没有做我上面要求的事情。例如, 在“A”上匹配并将所有键保留在左侧

  • 问题内容: 我有两个表(表A和表B)。 它们具有不同的列数-假设表A具有更多列。 如何合并这两个表,并为表B没有的列获取空值? 问题答案: 为具有较少列的表添加额外的列作为null

  • 我肯定错过了一些简单的东西。尝试合并熊猫中的两个数据帧,它们的列名基本相同,但右边的数据帧有一些左边没有的列,反之亦然。 我已尝试使用外部联接进行联接: 但这会产生: 我还指定了一个要连接的列(例如,on="id"),但这会重复所有列,除了"id",如attr_1_x、attr_1_y,这并不理想。我还传递了整个列列表(有很多)到on: 其产生: 我错过了什么?我想得到一个附加了所有行的df,并且

  • 我有两个数据。表X和表Y。 列 X: Y 中的值列: 创建两个data.tables: 我设置了 X 和 Y 的键: 现在,我尝试通过X中的< code>id和Y中的< code>ID来连接X和Y: All引发错误,指出参数中的列名无效。 我查阅了data.table的手册,发现< code>merge函数不支持< code>by.x和< code>by.y参数。 如何在不更改列名的情况下通过不同