有人能用Java代码将下面的JSON转换为Spark DataFrame吗…
注意:它不是文件
逻辑:听kafka主题T1,读取RDD中的每个记录,并应用附加逻辑将结果数据转换为Json对象,并将其写入kafka中的另一主题T2。
[
{
"@tenant_id":"XYZ",
"alarmUpdateTime":1526342400000,
"alarm_id":"AB5C9123",
"alarm_updates":[
{
"alarmField":"Severity",
"new_value":"Minor",
"old_value":"Major"
},
{
"alarmField":"state",
"new_value":"UPDATE",
"old_value":"NEW"
}
],
"aucID":"5af83",
"inID":"INC15234567",
"index":"test",
"product":"test",
"source":"ABS",
"state":"NEW"
}
]
ClassAlarm{
String @tenant_id;
String alarm_id;
.
.
List <AlarmUpdate> update;
Get and Setter functions for all variables
}
AlarmUpdate{
String alarmField;
String oldVal;
String NewVal;
Get and Setter functions for all variables
}
AppClass{
void static main(){
Alarm alarmObj = new Alarm();
//set values for variables in alarmObj.
Dataset <Row> results = jobCtx.getSparkSession().createDataFrame(Arrays.asList(alarmObj), Alarm.class)
//At this point seeing following errors.
}
}
您可以使用WholeTextFiles
读取json文件,获取json文本,并将其作为SparkSession
的json
api使用
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
static SparkSession spark = SparkSession.builder().master("local").appName("simple").getOrCreate();
static JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Row> df = spark.read().json(sc.wholeTextFiles("path to json file").map(t -> t._2()));
df.show(false);
你应该得到
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|@tenant_id|alarmUpdateTime|alarm_id|alarm_updates |aucID|inID |index|product|source|state|
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|XYZ |1526342400000 |AB5C9123|[[Severity,Minor,Major], [state,UPDATE,NEW]]|5af83|INC15234567|test |test |ABS |NEW |
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
您可以根据需要使用master
和appname
String t1Record = "[\n" +
" {\n" +
" \"@tenant_id\":\"XYZ\",\n" +
" \"alarmUpdateTime\":1526342400000,\n" +
" \"alarm_id\":\"AB5C9123\",\n" +
" \"alarm_updates\":[\n" +
" {\n" +
" \"alarmField\":\"Severity\",\n" +
" \"new_value\":\"Minor\",\n" +
" \"old_value\":\"Major\"\n" +
" },\n" +
" {\n" +
" \"alarmField\":\"state\",\n" +
" \"new_value\":\"UPDATE\",\n" +
" \"old_value\":\"NEW\"\n" +
" }\n" +
" ],\n" +
" \"aucID\":\"5af83\",\n" +
" \"inID\":\"INC15234567\",\n" +
" \"index\":\"test\",\n" +
" \"product\":\"test\",\n" +
" \"source\":\"ABS\",\n" +
" \"state\":\"NEW\"\n" +
" }\n" +
"]";
JavaRDD<String> t1RecordRDD = sc.parallelize(Arrays.asList(t1Record));
Dataset<Row> df = spark.read().json(t1RecordRDD);
我有一个关于使用jolt将平面json转换成嵌套json的问题。我对jolt很陌生,这是我的意见 我编写了jolt spec,但我没有得到想要的输出 我的预期产出是: 任何震动专家都可以帮助我获得所需的输出。我应该在颠簸中使用多个变换,还是可以在一个震动变压器中获得所需的输出?
问题内容: 有人可以提供一个示例或参考,该示例或参考提供一种使用Jackson库将嵌套JAVA对象转换为JSON输出的方法的方法。我没有转换平面JAVA对象的问题。但是,JSON库显示嵌套的对象名称和类型,而不是其子对象。我几乎可以利用此处提供的相同代码http://www.mkyong.com/java/jackson-2-convert- java-object-to-from-json/ 。
这个问题被问了很多次,但我找不到解决问题的答案。 我试图将嵌套的JSON格式转换为CSV格式如下: JSON结构是任意的,可以是任何东西,嵌套或不嵌套。 我不应该知道它,这是一个数据库答案,我需要将这个JSON答案导出到CSV文件中。 下面是一个例子 输入: 我想要的结果是: 这是一个例子,它可以是任何其他JSON文档。 这里的想法是在CSV列名中使用点表示法。 我已经使用了CDL,但输出不是我想
我试图根据第二个嵌套数组中的值的数量将嵌套数组转换为对象。我似乎无法获取值字段的数量并将其用作规范中的键。现在这是我的输入JSON文件: 这是我想要的JSON输出: 这是我目前的规格 有人有类似的情况吗?
问题内容: 我是Python和Pandas的新手。我正在尝试将Pandas Dataframe转换为嵌套的JSON。函数.to_json()不能为我的目标提供足够的灵活性。 以下是数据框的一些数据点(在csv中,以逗号分隔): 有很多重复的信息,我想要一个这样的JSON: 我怎样才能做到这一点? 编辑: 再现数据帧的代码: 问题答案: 更新: 结果(格式化): 旧答案: 你可以用它做的,和方法:
我有一个JSON如下所示: 为什么在输出中看不到Level1、Level2?请有人帮忙,我想看看在输出和输入太相似了。