当前位置: 首页 > 知识库问答 >
问题:

如何在java中处理Apache spark Streaming中的Json数据

井旺
2023-03-14

嗨,我是阿帕奇星火新用户。我正在学习的路上。我已经从kafka主题为json数据编写了spark streaming。下面是json数据的连续流。但现在我不知道如何使用这个json数据。我使用DataSet和DataFrame来处理Json数据,但遇到了一些错误。请用几个例子来帮助我,如何使用流式传输的数据流。

注意:我使用的是Apache Spark1.6.3版本。

(null{"time":"2017/08/21 18:25:11","model":"20E84fb","speed":"20E84fb","cellId":"0605d822E84fb","course":"146.37E84fb","header":"ST600ALTE84fb","deviceId":206675884,"distance":"166E84fb","longitude":"-099.168493E84fb","latitude":"19.428616E84fb","payload":"ST600ALT+number+;206675884;20;376;20161005;16:26:59;0605d822;334;20;2ee5;63;+19.428616;-099.168493;000.213;146.37;6;1;166;12.21;000000;34;000887;4.4;1;0.00E84fb","date":"2017/08/21 18:25:11E84fb"})

代码:

package datapipeline;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.onosproject.net.Device;

import scala.Tuple2;

public final class SparkConsumer {
    //private static SparkContext sc = new SparkContext();
    private static final Pattern SPACE = Pattern.compile(" ");


    private static void setLogLevels() {
        boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements();
        if (!log4jInitialized) {
            // We first log something to initialize Spark's default logging, then we override the
            // logging level.
            Logger.getLogger(SparkConsumer.class).info("Setting log level to [WARN] for streaming example." +
                    " To override add a custom log4j.properties to the classpath.");
            Logger.getRootLogger().setLevel(Level.WARN);
        }
    }

    public static void main(String[] args) throws Exception {

        String jars[]={"C:\\DeviceStreaming-1.0.0.jar"};


        setLogLevels();


        SparkConf sparkConf = new SparkConf().setAppName("CustomerKafkaConsumerThread")
                .set("spark.local.ip","localhost:9092")
                .setMaster("local[*]").setJars(jars);
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
        JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf));

        SQLContext sqlContext = new SQLContext(ctx);

        Map<String, Integer> topicMap = new HashMap<>();

        topicMap.put("iot", 10);


        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc,"localhost:2181","SparkConsumer", topicMap,StorageLevel.MEMORY_ONLY());
        messages.print();


        JavaDStream<String> json = messages.map(
                new Function<Tuple2<String, String>, String>() {
                    public String call(Tuple2<String, String> message) {

                        return message._2();
                    }
                }
            );

        json.foreachRDD(rdd -> {

            //DataFrame df = sqlContext.read().json(rdd);
            DataFrame df=sqlContext.createDataFrame(rdd, Device.class);
            df.registerTempTable("rdd");
            df.filter("cellId");
            /*DataFrame deviceFrame= sqlContext.sql("SELECT cellID FROM rdd where cellId=206675884");
            df.show();
            df.printSchema();

            List<String> list=  deviceFrame.javaRDD().map(row -> row.getString(0)).collect();*/

        });

        jssc.start();
        jssc.awaitTermination();
    }
}

共有1个答案

糜帅
2023-03-14

您可以使用get_json_object函数从JSON中提取数据。

根据指定的json路径从json字符串中提取json对象,并返回提取的json对象的json字符串。如果输入的json字符串无效,它将返回null。

尝试以下操作:

参见:https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/functions.html#get_json_object.org.apache.spark.sql.column,%20java.lang.string)

 类似资料:
  • 下面是一个java类CreateDoc,它从一个作为生产者端的web服务发送到另一个作为消费者端的web服务,其内容类型为:json 下面是类表示 一旦我在消费者端接收到作为json的列表,我就不能将其用作java对象,并且内容类型是数组,其中json嵌套在数组中。 以下是代表: 问题是如何处理这一点,并能够使用数据和表示为列表。

  • 问题内容: 这是异步发送到我的php页面的JSON。本质上,这是一个产品列表,它将插入到我的mySQL数据库中。 我的问题是在PHP中解码JSON。我可以使用``eval’‘函数在js中做到这一点,但是在PHP中,我的努力导致了一系列复杂的爆炸和内爆函数。 我知道php具有内置的json_decode函数,但是在PHP文档中,它们仅显示如何处理数组。 任何建议或帮助都非常感谢 泰勒 问题答案: 如

  • 下面是我得到的API请求的JSON响应。 user={'name':'Siva','address':'my address','pincode':12345,'url':'http://myweb.com/index.php?title=firstname:lastname中间名 由于此JSON响应以user=开头,因此它既不是JSONObject也不是JSONArray。所以我认为这是字符串,

  • 本文向大家介绍如何在Java中处理Selenium中的代理?,包括了如何在Java中处理Selenium中的代理?的使用技巧和注意事项,需要的朋友参考一下 我们可以借助PROXY类在Java中使用Selenium处理代理。

  • 问题内容: 如何用Java 处理? 问题答案: 我不确定“句柄”是什么意思。 您当然可以捕获该错误: 但这很可能是个坏主意,除非您确切地知道自己在做什么。

  • 问题内容: 在开发高度基于XML的Java应用程序时,我最近在Ubuntu Linux上遇到了一个有趣的问题。 我的应用程序使用Java Plugin Framework ,似乎无法将dom4j创建的XML文档转换为Batik的 SVG规范实现。 在控制台上,我了解到发生了错误: 我认为问题是由来自JVM的原始类加载器与插件框架部署的类加载器之间的冲突引起的。 据我所知,不可能为框架指定一个类加载