我尝试根据flink doc添加一个具有事件时间属性的表源。我的代码像:
class SISSourceTable
extends StreamTableSource[Row]
with DefinedRowtimeAttributes
with FlinkCal
with FlinkTypeTags {
private[this] val profileProp = ConfigurationManager.loadBusinessProperty
val topic: String = ...
val schemas = Seq(
(TsCol, SQLTimestamp),
(DCol, StringTag),
(CCol, StringTag),
(RCol, StringTag)
)
override def getProducedDataType: DataType = DataTypes.ROW(extractFields(schemas): _*)
override def getTableSchema: TableSchema =
new TableSchema.Builder()
.fields(extractFieldNames(schemas), extractFieldDataTypes(schemas))
.build()
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] =
Collections.singletonList(
new RowtimeAttributeDescriptor(
TsCol,
new ExistingField(TsCol),
new AscendingTimestamps
)
)
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
val windowTime: Int = profileProp.getProperty("xxx", "300").toInt
val source = prepareSource(topic)
val colsToCheck = List(RCol, CCol, DCol)
execEnv
.addSource(source)
.map(new MapFunction[String, Map[String, String]]() {
override def map(value: String): Map[String, String] = ...
})
.map(new MapFunction[Map[String, String], Row]() {
override def map(value: Map[String, String]): Row = {
Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Row](Time.seconds(windowTime)) {
override def extractTimestamp(element: Row): Long = element.getField(0).asInstanceOf[Timestamp].getTime
})
}
}
我在getDataStream方法中得到的source
是一个Kafka字符串源。我从每条kafka记录中提取了一个TsCol。我想使用TsCol作为事件时间。但是TsCol是字符串数据类型的10位时间戳,所以我需要将其转换为13位Long数据类型。当我尝试使用13位Long数据作为行时时,我得到了异常,说行时只能从SQL_TIMESTAMP列中提取。所以我最终将ts col转换为java.sql.Timestamp。当我在Source Table上面注册并运行flink时。我得到了以下异常:
org.apache.flink.table.api.TableException: TableSource of type com.mob.mobeye.flink.table.source.StayInStoreSourceTable returned a DataStream of data type ROW<`t` TIMESTAMP(3), `mac` STRING, `c` STRING, `r` STRING> that does not match with the data type ROW<`t` TIMESTAMP(3), `mac` STRING, `c` STRING, `r` STRING> declared by the TableSource.getProducedDataType() method. Please validate the implementation of the TableSource.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:113)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:97)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:97)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:133)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:61)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
我很困惑,为什么
排
与数据类型不匹配
排
我在另一个地方也遇到了类似的错误,在那里我把时间戳替换成了Long,并且它起作用了。但在这里,我需要将列t提取为行时间,因此它必须是TIMESTAMP(3)类型。我非常感谢有人能帮助解决这个问题。
您使用的是哪个 flink 版本?如果我没有记错的话,你正在使用一个版本
如果是这样,异常消息不是很有帮助,因为它有一个错误,该错误已在 https://issues.apache.org/jira/browse/FLINK-15726 中修复。在此之前,实际上相同的类型被打印了两次。
您的实现存在一些问题。类型不匹配很可能是因为您生成了由中的< code>map运算符返回的< code > GenericTypeInformation
.map(new MapFunction[Map[String, String], Row]() {
override def map(value: Map[String, String]): Row = {
Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
}
})
尝试将其更改为
.map(new MapFunction[Map[String, String], Row]() {
override def map(value: Map[String, String]): Row = {
Row.of(new Timestamp(value(TsCol).toLong * 1000), value(DCol), value(CCol), value(RCol))
}
}).returns(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING, Types.STRING))
其次,您无需在 TableSource 中分配时间戳和水印。它们将根据通过DesoledRowtimeAttributes
提供的信息自动分配。
我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011 这在从IDE本地运行时非常有效。但是,
问题内容: 我收到以下错误: 但是我看不到哪个参数错误? 这是我使用的代码。 我已经添加并删除了,但出现了相同的错误。 问题答案: 您将月份和日期交换了: 否则将永远不会适合month参数的范围。 随着并以正确的顺序解析的工作原理: 您无需添加;可以正确解析较短的数字:
我如何在Apache flink中使用摄取时间特征。我知道我们需要设置环境时间特征。但是我如何收集带有时间戳的数据,可以称为摄取时间。目前我使用它时,它是根据系统时钟时间处理窗口。我想根据数据进入flink环境的时间进行处理。 有助于清晰理解的少量代码摘录: 环境的时间特征: 窗口时间: 源中的集合: 如果数据采集在11:03开始,我想在11:08结束,即5分钟。但它会在11点05分停止(某种程度
我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。 提前道谢!
问题内容: $(‘div’).data(‘info’, 1); 我在jquery中创建元素。之后,我要添加属性“数据”。他很喜欢,并且被添加了,但是在DOM中,这并不明显,我无法使用 问题答案: 使用方法: 请注意,这不会创建实际的属性。如果需要创建属性,请使用:
我在jQuery中创建元素。之后,我想添加属性“数据”。他的喜欢,并添加,但在DOM中,这是不明显的,我不能得到的项目,使用 jsfiddle