当前位置: 首页 > 知识库问答 >
问题:

将json提取到案例类SCALA FLINK时出错

郎子平
2023-03-14

问题在于在进行案例类提取时映射函数中。case类不可序列化。我隐式地定义了格式DefaultFormats

package org.apache.flink.quickstart
import java.util.Properties

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try


case class CC(key:String)

object WordCount{
  def main(args: Array[String]) {

    implicit val formats = org.json4s.DefaultFormats

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

   val st = env
       .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
       .flatMap(raw => JsonMethods.parse(raw).toOption)
//       .map(_.extract[CC])


    val l = st.map(_.extract[CC])

    st.print()
      env.execute()
  }
}

错误是:

INFO[main](TypeExtractor.java:1804)-未检测到类组织的字段。json4s。JsonAST$JValue。不能用作PojoType。将作为线程“main”组织中的GenericType异常处理。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:任务无法在组织中序列化。阿帕奇。Flink。应用程序编程接口。斯卡拉。ClosureCleaner美元。EnsureCleaner.scala:172)可在org。阿帕奇。Flink。应用程序编程接口。斯卡拉。ClosureCleaner美元。clean(ClosureCleaner.scala:164)位于org。阿帕奇。Flink。流动。应用程序编程接口。斯卡拉。拖缆执行环境。scalaClean(streamexecutionenEnvironment.scala:666),网址:org。阿帕奇。Flink。流动。应用程序编程接口。斯卡拉。数据流。clean(DataStream.scala:994)位于org。阿帕奇。Flink。流动。应用程序编程接口。斯卡拉。数据流。地图(DataStream.scala:519)位于org。阿帕奇。Flink。快速启动。WordCount$。main(WordCount.scala:38)位于org。阿帕奇。Flink。快速启动。字数。main(WordCount.scala)由以下原因引起:java。伊奥。NotSerializableException:org。json4s。java上的DefaultFormats$$anon$4。伊奥。ObjectOutputStream。java上的writeObject0(ObjectOutputStream.java:1184)。伊奥。ObjectOutputStream。java上的defaultWriteFields(ObjectOutputStream.java:1548)。伊奥。ObjectOutputStream。在java上编写SerialData(ObjectOutputStream.java:1509)。伊奥。ObjectOutputStream。java上的writeOrdinaryObject(ObjectOutputStream.java:1432)。伊奥。ObjectOutputStream。java上的writeObject0(ObjectOutputStream.java:1178)。伊奥。ObjectOutputStream。java上的defaultWriteFields(ObjectOutputStream.java:1548)。伊奥。ObjectOutputStream。在java上编写SerialData(ObjectOutputStream.java:1509)。伊奥。ObjectOutputStream。java上的writeOrdinaryObject(ObjectOutputStream.java:1432)。伊奥。ObjectOutputStream。java上的writeObject0(ObjectOutputStream.java:1178)。伊奥。ObjectOutputStream。writeObject(ObjectOutputStream.java:348)位于org。阿帕奇。Flink。util。实例化util。org上的serializeObject(InstantiationUtil.java:317)。阿帕奇。Flink。应用程序编程接口。斯卡拉。ClosureCleaner美元。确保可重复使用(ClosureCleaner.scala:170)。。。还有6个

Process finished with exit code 1

共有1个答案

孟德曜
2023-03-14

解决问题的办法

implicit val formats = org.json4s.DefaultFormats

在主要功能之外,比如

object WordCount{

    implicit val formats = org.json4s.DefaultFormats
    def main(args: Array[String])

或者懒洋洋地初始化格式,比如

implicit lazy val formats = org.json4s.DefaultFormats

里面的主要功能就像

def main(args: Array[String]) {
    implicit lazy val formats = org.json4s.DefaultFormats
 类似资料:
  • 我有一个包含3000多条记录的数据框架,其中包括每次观测的经纬度坐标。我想从每一组坐标中得到国家和州或省。 或者,一个更好的解决我的问题,获得空间信息是赞赏的! 下面是我的代码:

  • 它提供类实例信息,但不提供类实例的序列化内容。 我做错了什么?

  • 问题内容: 我正在尝试以JSON格式回显对象的内容。我对PHP完全没有经验,我想知道是否有预定义的函数可以执行此操作(例如json_encode()),还是您必须自己构建字符串?当谷歌搜索“ PHP对象到JSON”时,我只是在寻找垃圾。 我想要JSON返回的内容: {名称:“ $ name var的内容”,代码:1001,信息:执行请求时出错} 问题答案: 你就在那里。看一下与json_encod

  • 我的要求是插入供应商和目录,我只想单向映射。有没有反正解决它的单向只是因为我想从供应商(供应商)获取目录,我是新来的堆栈溢出,如果有任何编辑错误,请评论下来,我会修复它。 实体目录。Java语言 Supplier.java 服务类目录服务 供应商服务 我还想知道如何在spring boot中编写项目名称的搜索查询。假设供应商A有薯片、香蕉片、马沙拉片、咖啡。我想根据供应商ID搜索目录项,当我搜索“

  • 主要内容:实例,保存并执行代码,实例下面将创建一个名为 Customer 的 Ruby 类,声明两个方法: display_details:该方法用于显示客户的详细信息。 total_no_of_customers:该方法用于显示在系统中创建的客户总数量。 实例 #!/usr/bin/ruby class Customer @@no_of_customers=0 def initialize(id, name, addr) @cus

  • 线程“main”org.jsoup.HttpStatusException中出现异常:提取URL时出现HTTP错误。status=403,url=java Html解析器提取特定数据?在org.jsoup.helper.httpconnection$response.execute(httpconnection.java:590)在org.jsoup.helper.httpconnection$r