当前位置: 首页 > 知识库问答 >
问题:

将复杂的嵌套Json转换为JAVA中的Spark Dataframe

申屠喜
2023-03-14

有人能用Java代码将下面的JSON转换为Spark DataFrame吗…

注意:它不是文件

逻辑:听kafka主题T1,读取RDD中的每个记录,并应用附加逻辑将结果数据转换为Json对象,并将其写入kafka中的另一主题T2。

 [  
   {  
      "@tenant_id":"XYZ",
      "alarmUpdateTime":1526342400000,
      "alarm_id":"AB5C9123",
      "alarm_updates":[  
         {  
            "alarmField":"Severity",
            "new_value":"Minor",
            "old_value":"Major"
         },
         {  
            "alarmField":"state",
            "new_value":"UPDATE",
            "old_value":"NEW"
         }
      ],
      "aucID":"5af83",
      "inID":"INC15234567",
      "index":"test",
      "product":"test",
      "source":"ABS",
      "state":"NEW"
   }
]
    ClassAlarm{

        String @tenant_id;
        String alarm_id;
        .
        .
        List <AlarmUpdate> update;
        Get and Setter functions for all variables
    }

AlarmUpdate{

    String alarmField;
    String oldVal;
    String NewVal;

    Get and Setter functions for all variables
} 

AppClass{


     void static main(){
             Alarm alarmObj = new Alarm();
          //set values for variables in alarmObj.
           Dataset <Row> results = jobCtx.getSparkSession().createDataFrame(Arrays.asList(alarmObj), Alarm.class)

           //At this point seeing following errors.

      }

}

共有1个答案

左丘边浩
2023-03-14

您可以使用WholeTextFiles读取json文件,获取json文本,并将其作为SparkSessionjsonapi使用

import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

static SparkSession spark = SparkSession.builder().master("local").appName("simple").getOrCreate();
static JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

Dataset<Row> df = spark.read().json(sc.wholeTextFiles("path to json file").map(t -> t._2()));
df.show(false);

你应该得到

+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|@tenant_id|alarmUpdateTime|alarm_id|alarm_updates                               |aucID|inID       |index|product|source|state|
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|XYZ       |1526342400000  |AB5C9123|[[Severity,Minor,Major], [state,UPDATE,NEW]]|5af83|INC15234567|test |test   |ABS   |NEW  |
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+

您可以根据需要使用masterappname

    String t1Record = "[\n" +
            "  {\n" +
            "    \"@tenant_id\":\"XYZ\",\n" +
            "    \"alarmUpdateTime\":1526342400000,\n" +
            "    \"alarm_id\":\"AB5C9123\",\n" +
            "    \"alarm_updates\":[\n" +
            "      {\n" +
            "        \"alarmField\":\"Severity\",\n" +
            "        \"new_value\":\"Minor\",\n" +
            "        \"old_value\":\"Major\"\n" +
            "      },\n" +
            "      {\n" +
            "        \"alarmField\":\"state\",\n" +
            "        \"new_value\":\"UPDATE\",\n" +
            "        \"old_value\":\"NEW\"\n" +
            "      }\n" +
            "    ],\n" +
            "    \"aucID\":\"5af83\",\n" +
            "    \"inID\":\"INC15234567\",\n" +
            "    \"index\":\"test\",\n" +
            "    \"product\":\"test\",\n" +
            "    \"source\":\"ABS\",\n" +
            "    \"state\":\"NEW\"\n" +
            "  }\n" +
            "]";
    JavaRDD<String> t1RecordRDD = sc.parallelize(Arrays.asList(t1Record));
    Dataset<Row> df = spark.read().json(t1RecordRDD);
 类似资料:
  • 我有一个关于使用jolt将平面json转换成嵌套json的问题。我对jolt很陌生,这是我的意见 我编写了jolt spec,但我没有得到想要的输出 我的预期产出是: 任何震动专家都可以帮助我获得所需的输出。我应该在颠簸中使用多个变换,还是可以在一个震动变压器中获得所需的输出?

  • 问题内容: 有人可以提供一个示例或参考,该示例或参考提供一种使用Jackson库将嵌套JAVA对象转换为JSON输出的方法的方法。我没有转换平面JAVA对象的问题。但是,JSON库显示嵌套的对象名称和类型,而不是其子对象。我几乎可以利用此处提供的相同代码http://www.mkyong.com/java/jackson-2-convert- java-object-to-from-json/ 。

  • 这个问题被问了很多次,但我找不到解决问题的答案。 我试图将嵌套的JSON格式转换为CSV格式如下: JSON结构是任意的,可以是任何东西,嵌套或不嵌套。 我不应该知道它,这是一个数据库答案,我需要将这个JSON答案导出到CSV文件中。 下面是一个例子 输入: 我想要的结果是: 这是一个例子,它可以是任何其他JSON文档。 这里的想法是在CSV列名中使用点表示法。 我已经使用了CDL,但输出不是我想

  • 我试图根据第二个嵌套数组中的值的数量将嵌套数组转换为对象。我似乎无法获取值字段的数量并将其用作规范中的键。现在这是我的输入JSON文件: 这是我想要的JSON输出: 这是我目前的规格 有人有类似的情况吗?

  • 问题内容: 我是Python和Pandas的新手。我正在尝试将Pandas Dataframe转换为嵌套的JSON。函数.to_json()不能为我的目标提供足够的灵活性。 以下是数据框的一些数据点(在csv中,以逗号分隔): 有很多重复的信息,我想要一个这样的JSON: 我怎样才能做到这一点? 编辑: 再现数据帧的代码: 问题答案: 更新: 结果(格式化): 旧答案: 你可以用它做的,和方法:

  • 我有一个JSON如下所示: 为什么在输出中看不到Level1、Level2?请有人帮忙,我想看看在输出和输入太相似了。