当前位置: 首页 > 知识库问答 >
问题:

从Kafka流解析JSON RDD时的Java执行

龚志文
2023-03-14

我正在尝试使用Spark stream库从kafka读取一个json字符串。该代码能够连接到kafka broker,但在解码消息时失败。代码的灵感来自

https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/kafkastreamingjson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
         StringDecoder](ssc, kParams, kTopic).map(_._2)
  println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>

   if (rdd.toLocalIterator.nonEmpty) {

          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
           println(rdd.name)
        }

共有1个答案

骆英纵
2023-03-14

问题出在使用的Kafka jars版本上,使用0.9.0.0修复了该问题。类Kafka.Message.MessageAndMetadata是在0.8.2.0中引入的。

 类似资料:
  • 本文向大家介绍MyBatis执行Sql的流程实例解析,包括了MyBatis执行Sql的流程实例解析的使用技巧和注意事项,需要的朋友参考一下 这篇文章主要介绍了MyBatis执行Sql的流程实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本博客着重介绍MyBatis执行Sql的流程,关于在执行过程中缓存、动态SQl生成等细节不在本博客中体现

  • 我期待关于Kakfa主题的消息,一旦我收到消息,我就会发出关于新主题的消息。我使用streams API来实现这一点,它很简单。但是,由于系统不可靠,我可能永远不会接收到,但如果已经接收到消息的(例如),并且在秒内没有记录新消息,我仍然希望发出消息。这在Kafka streams中是可能的吗?还是我需要为它写一个consumer? 如果Kafka Streams有类似于Rx(http://reac

  • 本文向大家介绍Javascript执行流程细节原理解析,包括了Javascript执行流程细节原理解析的使用技巧和注意事项,需要的朋友参考一下 Javascript从定义到执行,JS引擎在实现层做了很多初始化工作,因此在学习JS引擎工作机制之前,我们需要引入几个相关的概念:执行环境栈、全局对象、执行环境、变量对象、活动对象、作用域和作用域链等,这些概念正是JS引擎工作的核心组件。这篇文章的目的不是

  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在

  • 问题内容: 我有解析日期的代码,如下所示: 一切正常,突然,这停止了。原来,管理员在服务器上进行了一些配置更改,并且当前返回的日期为“ 2010-12-27T10:50:44.000-08:00”,上述模式无法解析该日期。我有两个问题: 第一种是哪种模式将解析上述格式的JVM返回的日期(特别是时区为“ -08:00”)?其次,在Linux RHEL 5服务器上,究竟会在哪里更改此类设置,以便我们将

  • 本文向大家介绍IntelliJ-Idea导出可执行Jar流程解析,包括了IntelliJ-Idea导出可执行Jar流程解析的使用技巧和注意事项,需要的朋友参考一下 前言 IntelliJ Idea 导出可执行Jar包,记录如下。 创建Java文件,在里面添加main方法 配置Artifacts File-Project Structure-Project Settings-Artifacts 点击