当前位置: 首页 > 知识库问答 >
问题:

如何在Spark-Avro2.4模式中设置logicalType?

钦高峯
2023-03-14

我们从应用程序中的avro文件中读取时间戳信息。我正在测试从Spark2.3.1到Spark2.4的升级,其中包括新内置的spark-avro集成。但是,我不知道如何告诉avro模式,我希望时间戳具有“timestamp-millis”的逻辑类型,而不是缺省的“timestamp-micros”。

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

正如我们所看到的,底层类型仍然是long,但它现在被一个逻辑类型“timestamp-micros”所增强。这与发布说明中所说的完全一致,但是,我无法找到一种方法来指定使用'timestamp-millis'选项的模式。

这就成了一个问题,当我向一个avro文件写入一个Timestamp对象时,它被初始化为在epoch之后10,000秒,它将被重新读取为10,000,000秒。在2.3.1/databricks-avro中,它只是一个没有相关信息的长代码,所以它就像它进去一样出来了。

我们当前通过对感兴趣的对象进行如下反映来构建模式:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })
case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

我还尝试通过两种方式从JSON表示创建模式:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

第一个尝试是简单地创建一个数据类型

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

这是失败的,因为它无法为searchTime节点创建StructType,因为其中有“LogicalType”。第二个尝试是通过传入原始JSON字符串来简单地创建模式。

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)
mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

共有1个答案

韦熙云
2023-03-14

好吧,我想我回答了我自己的问题。当我修改以编程方式构建的架构以使用显式时间戳类型时

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

我没有改变逻辑时,我们做我们的读取,当我们有一个行对象,我们读回来。最初,我们会读一个很长的,并将其转换为时间戳,这是事情出错的地方,因为它读回来的长度为微秒,这将使它比我们预期的大1000倍。将我们的read更改为read Timestamp对象,直接让底层逻辑来解释这一点,将它从我们(我)手中拿走。所以:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
 类似资料:
  • 多线程。在这种模式下,SQLite可以安全地由多个线程使用,前提是在两个或多个线程中不同时使用单个数据库连接。 序列化。在序列化模式下,SQLite可以安全地由多个线程使用,不受限制。

  • 问题内容: 我需要解决我目前遇到的Unix-Windows文件格式(LF到CRLF)问题。我正在使用的ftp客户端是Jcraft的Jsch。 尽管我遇到了可以设置的标志,但在线文档非常空白 启用ASCII模式,但是我看不到应该在代码本身中设置的位置,也看不到这些Javadocs中提到的内容 以下是我自己的解决方法。“新添加的”行显示了如何获取文件并将其转换为ASCII编码的字符串,然后使用chan

  • 当我为数据库生成DDL时,我不想有任何外键。 根据< code > javax . persistence . constraint mode ,有一个默认值: 那么如何在Hibernate中设置默认值呢? 我在JPA中使用Hibernate通过Spring Boot数据。

  • 我在Azure上有一个Databricks5.3集群,它运行Apache Spark 2.4.0和Scala 2.11。 我不是Java/Scala开发人员,也不熟悉Spark/Databricks。我找不到Spark用来解析值的datetime格式化程序。 我尝试的模式:

  • 设置 应用设置允许用户选择他们对应用行为的偏好。它们授予用户真实的控制感,并且避免用户被同样的问题反复打扰。 访问设置 由于用户并不需要经常使用设置,所以它们在 UI 中并不显眼。应用中访问设置时:在任何情况下,进入“设置”的按钮都应简单命名为“设置”。如果当前的页面支持左导航栏,那么把设置放在导航栏中除“帮助及反馈”外的所有按钮的下方。另外,如果当前页面里有工具栏,把设置放在工具栏的更多操作(a