数据集,Json数组
NJBK 2022-04-07 17:13:52 {"inst_id":"NJBK","info_map":"[{\"foreign_loan_prod_code_name\":\"单期贷\",\"ant_credit_prod_code\":\"A91\"},{\"foreign_loan_prod_code_name\":\"分期贷\",\"ant_credit_prod_code\":\"B92\"}]"}
BJBK 2022-04-01 15:11:45 {"inst_id":"BJBK","info_map":"[{\"foreign_loan_prod_code_name\":\"单期贷\",\"ant_credit_prod_code\":\"A91\"},{\"foreign_loan_prod_code_name\":\"分期贷\",\"ant_credit_prod_code\":\"B92\"}]"}
Scala-Json解析代码
import java.util
import org.apache.flink.api.common.state._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject, parser}
import org.apache.flink.table.descriptors.Json
import org.apache.flink.util.{Collector,CollectionUtil}
import org.apache.flink.streaming.api.functions.sink.{SinkFunction}
// 定义样例类,温度传感器
case class JsonData( orgId: String, timeCreate: String, jsonAarray: JSONArray )
object FlinkJson {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 读取数据
val inputPath = "/Users/IdeaProjects/flinktest1/src/main/resources/json.txt"
val inputStream = env.readTextFile(inputPath)
// 先转换成样例类类型(简单转换操作)
val dataStream = inputStream
.map( data => {
val arr = data.split("\t")
val timeCreate = arr(1).substring(0, 10)
val jsonObj: JSONObject = JSON.parseObject(arr(2))
//直接通过key获取value
//val value1: String = jsonObj.getString("inst_id")
val value: String = jsonObj.getString("info_map")
val value2: JSONArray = JSON.parseArray(value)
JsonData(arr(0), timeCreate, value2)
}).flatMap({
(in, out: Collector[(String, String, String, String)]) =>
in.jsonAarray
.forEach(data =>
out.collect((in.orgId, in.timeCreate
,data.asInstanceOf[JSONObject].getString("foreign_loan_prod_code_name")
,data.asInstanceOf[JSONObject].getString("ant_credit_prod_code")))
)
})
dataStream.print()
env.execute("state test")
}
}
import org.apache.flink.api.common.state._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject, parser}
import org.apache.flink.util.Collector
object FlinkJson {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 读取数据
val inputPath = "/Users/IdeaProjects/flinktest1/src/main/resources/json.txt"
val inputStream = env.readTextFile(inputPath)
// 先转换成样例类类型(简单转换操作),解析json
val dataStream = inputStream
.flatMap( (in, out: Collector[(String, String, String, String)]) => {
val arr = in.split("\t")
val timeCreate = arr(1).substring(0, 10)
val jsonObj: JSONObject = JSON.parseObject(arr(2))
//直接通过key获取value
//val value1: String = jsonObj.getString("inst_id")
val value: String = jsonObj.getString("info_map")
val value2: JSONArray = JSON.parseArray(value)
//for循环解析Json数组
for (a <- 0 to value2.size()-1){
val object1 = JSON.parseObject(value2.get(a).toString)
out.collect((arr(0), timeCreate
,object1.getString("foreign_loan_prod_code_name")
,object1.getString("ant_credit_prod_code")))
}
})
dataStream.print()
env.execute("state test")
}
}
Java-Json解析代码
import com.alibaba.fastjson.*;//(JSON, JSONArray, JSONObject, parser)
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.lang.Integer;
import java.sql.Time;
import java.text.SimpleDateFormat;
public class FlinkJson {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =new StreamExecutionEnvironment().getExecutionEnvironment();
env.setParallelism(1);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//读取数据
String inputPath = "/Users/IdeaProjects/flinktest1/src/main/resources/json.txt";
DataStreamSource<String> inputStream = env.readTextFile(inputPath);
//数据格式转换
SingleOutputStreamOperator<Tuple4<String, String, String, String>> dataStream =
inputStream.flatMap((String line, Collector<Tuple4<String, String, String, String>> out) -> {
String[] words = line.split("\t");
String timeCreate = words[1].substring(0, 10); //对时间字符串进行截取
JSONObject jsonObj= JSON.parseObject(words[2]);
//通过json对象获取里面json数组
String value = jsonObj.getString("info_map");
JSONArray jsonArray = JSON.parseArray((String) value); //json数组
//for循环获取JSONArray数组里面的元素
for (int i = 0; i <jsonArray.size() ; i++) {
//遍历JSONArray数组, 将JSONArray数组转成JSONObject对象
JSONObject object1 = JSONArray.parseObject(jsonArray.get(i).toString());
out.collect(Tuple4.of(words[0], timeCreate
,object1.getString("foreign_loan_prod_code_name")
,object1.getString("ant_credit_prod_code")
));
}
//用forEach函数获取JSONArray数组里面的元素
jsonArray.forEach( (e)-> {
JSONObject object1 = JSONArray.parseObject(e.toString());
out.collect(Tuple4.of(words[0], timeCreate
,object1.getString("foreign_loan_prod_code_name")
,object1.getString("ant_credit_prod_code")
));
});
})
//.filter(r -> Integer.parseInt(r.f1) >= Integer.parseInt("2022-05-01"))
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.STRING))
.filter(r -> r.f1.compareTo("2022-05-01")>=0);
//打印数据
dataStream.print();
//7.启动执行
env.execute();
}
}