当前位置: 首页 > 知识库问答 >
问题:

按表源添加时间属性时,Flink数据类型不匹配

锺星腾
2023-03-14

我尝试根据flink doc添加一个具有事件时间属性的表源。我的代码像:

class SISSourceTable
    extends StreamTableSource[Row]
    with DefinedRowtimeAttributes
    with FlinkCal
    with FlinkTypeTags {
  private[this] val profileProp = ConfigurationManager.loadBusinessProperty
  val topic: String = ...
  val schemas = Seq(
    (TsCol, SQLTimestamp),
    (DCol, StringTag),
    (CCol, StringTag),
    (RCol, StringTag)
  )

  override def getProducedDataType: DataType = DataTypes.ROW(extractFields(schemas): _*)

  override def getTableSchema: TableSchema =
    new TableSchema.Builder()
      .fields(extractFieldNames(schemas), extractFieldDataTypes(schemas))
      .build()

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] =
    Collections.singletonList(
      new RowtimeAttributeDescriptor(
        TsCol,
        new ExistingField(TsCol),
        new AscendingTimestamps
      )
    )

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    val windowTime: Int = profileProp.getProperty("xxx", "300").toInt
    val source = prepareSource(topic)
    val colsToCheck = List(RCol, CCol, DCol)

    execEnv
      .addSource(source)
      .map(new MapFunction[String, Map[String, String]]() {
        override def map(value: String): Map[String, String] = ...
      })
      .map(new MapFunction[Map[String, String], Row]() {
        override def map(value: Map[String, String]): Row = {
          Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
        }
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Row](Time.seconds(windowTime)) {
        override def extractTimestamp(element: Row): Long = element.getField(0).asInstanceOf[Timestamp].getTime
      })
  }
}

我在getDataStream方法中得到的source是一个Kafka字符串源。我从每条kafka记录中提取了一个TsCol。我想使用TsCol作为事件时间。但是TsCol是字符串数据类型的10位时间戳,所以我需要将其转换为13位Long数据类型。当我尝试使用13位Long数据作为行时时,我得到了异常,说行时只能从SQL_TIMESTAMP列中提取。所以我最终将ts col转换为java.sql.Timestamp。当我在Source Table上面注册并运行flink时。我得到了以下异常:

org.apache.flink.table.api.TableException: TableSource of type com.mob.mobeye.flink.table.source.StayInStoreSourceTable returned a DataStream of data type ROW<`t` TIMESTAMP(3), `mac` STRING, `c` STRING, `r` STRING> that does not match with the data type ROW<`t` TIMESTAMP(3), `mac` STRING, `c` STRING, `r` STRING> declared by the TableSource.getProducedDataType() method. Please validate the implementation of the TableSource.
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:113)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:97)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:97)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:133)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
    at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:61)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
    at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)

我很困惑,为什么

与数据类型不匹配

我在另一个地方也遇到了类似的错误,在那里我把时间戳替换成了Long,并且它起作用了。但在这里,我需要将列t提取为行时间,因此它必须是TIMESTAMP(3)类型。我非常感谢有人能帮助解决这个问题。

共有1个答案

商佑运
2023-03-14

您使用的是哪个 flink 版本?如果我没有记错的话,你正在使用一个版本

如果是这样,异常消息不是很有帮助,因为它有一个错误,该错误已在 https://issues.apache.org/jira/browse/FLINK-15726 中修复。在此之前,实际上相同的类型被打印了两次。

您的实现存在一些问题。类型不匹配很可能是因为您生成了由中的< code>map运算符返回的< code > GenericTypeInformation

      .map(new MapFunction[Map[String, String], Row]() {
        override def map(value: Map[String, String]): Row = {
          Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
        }
      })

尝试将其更改为

      .map(new MapFunction[Map[String, String], Row]() {
        override def map(value: Map[String, String]): Row = {
          Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
        }
      }).returns(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING, Types.STRING))

其次,您无需在 TableSource 中分配时间戳和水印。它们将根据通过DesoledRowtimeAttributes提供的信息自动分配。

 类似资料:
  • 我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011 这在从IDE本地运行时非常有效。但是,

  • 问题内容: 我收到以下错误: 但是我看不到哪个参数错误? 这是我使用的代码。 我已经添加并删除了,但出现了相同的错误。 问题答案: 您将月份和日期交换了: 否则将永远不会适合month参数的范围。 随着并以正确的顺序解析的工作原理: 您无需添加;可以正确解析较短的数字:

  • 我如何在Apache flink中使用摄取时间特征。我知道我们需要设置环境时间特征。但是我如何收集带有时间戳的数据,可以称为摄取时间。目前我使用它时,它是根据系统时钟时间处理窗口。我想根据数据进入flink环境的时间进行处理。 有助于清晰理解的少量代码摘录: 环境的时间特征: 窗口时间: 源中的集合: 如果数据采集在11:03开始,我想在11:08结束,即5分钟。但它会在11点05分停止(某种程度

  • 我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。 提前道谢!

  • 问题内容: $(‘div’).data(‘info’, 1); 我在jquery中创建元素。之后,我要添加属性“数据”。他很喜欢,并且被添加了,但是在DOM中,这并不明显,我无法使用 问题答案: 使用方法: 请注意,这不会创建实际的属性。如果需要创建属性,请使用:

  • 我在jQuery中创建元素。之后,我想添加属性“数据”。他的喜欢,并添加,但在DOM中,这是不明显的,我不能得到的项目,使用 jsfiddle