当前位置: 首页 > 知识库问答 >
问题:

Flink表API

霍财
2023-03-14

我正在流媒体环境中使用Flink的表API和/或Flink的SQL支持(Flink 1.3.1、Scala 2.11)。我从一个数据流【Person】开始,Person是一个case类,看起来像:

Person(name: String, age: Int, attributes: Map[String, String])

在我开始将属性带入图片之前,一切都按预期进行。

例如:

val result = html" target="_blank">streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

。。。导致:

线程“main”组织中出现异常。阿帕奇。Flink。桌子api。TableException:不支持类型:组织中的任何。阿帕奇。Flink。桌子api。TableException$。在组织中应用(例外。scala:53)。阿帕奇。Flink。桌子方解石FlinkTypeFactory美元。toTypeInfo(FlinkTypeFactory.scala:341)位于org。阿帕奇。Flink。桌子计划必然的LogicalRelNode$$anonfun$12。在组织中应用(operators.scala:531)。阿帕奇。Flink。桌子计划必然的LogicalRelNode$$anonfun$12。在scala上应用(运算符。scala:530)。收集TraversableLike$$anonfun$映射$1。在scala上应用(TraversableLike.scala:234)。收集TraversableLike$$anonfun$映射$1。在scala上应用(TraversableLike.scala:234)。收集迭代器$类。scala上的foreach(迭代器。scala:893)。收集抽象迭代器。scala的foreach(迭代器。scala:1336)。收集IterableLike$类。scala的foreach(IterableLike.scala:72)。收集可抽象。scala的foreach(Iterable.scala:54)。收集TraversableLike$类。在scala处映射(TraversableLike.scala:234)。收集可抽象遍历。地图(可遍历。scala:104)位于组织。阿帕奇。Flink。桌子计划必然的LogicalRelNode。(operators.scala:530)位于组织。阿帕奇。Flink。桌子api。TableEnvironment。sql(TableEnvironment.scala:503)位于com。诺德斯特罗姆。mdt。工作$。com上的main(Job.scala:112)。诺德斯特罗姆。mdt。工作主(作业.scala)

注意:无论是否存在特定的地图键,都会发生此错误。另请注意,如果我根本没有指定地图键,我会得到一个不同的错误,这是有意义的;这种情况在这里不起作用。

这个PR似乎说有一条前进的道路:https://github.com/apache/flink/pull/3767.具体看一下测试用例,它表明类型信息可以使用DataSets。来自DataStream和的相关方法ynsterDataStream都没有提供提供类型信息的方法。

这可能吗?换句话说,流上的Flink SQL能支持映射吗?

澄清编辑...当省略map键(GROUP BY...属性而不是属性['foo'])时,我得到了下面的错误。这表明运行时确实知道这些是字符串。

此类型(接口scala.collection.immutable.Map[scala. Tuple2(_1: String,_2: String)])不能用作键。


共有1个答案

东郭自强
2023-03-14
匿名用户

目前,Flink SQL只支持Java。util。地图。Scala映射被视为具有Flink数据类型的黑盒。因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用['key']操作符进行访问。

因此,要么使用Java映射,要么自己在UDF中实现访问操作。

我为您的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360

 类似资料:
  • 不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?

  • 我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。 所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗? 向你致意,伊格内修斯

  • 我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,

  • 亲爱的, 我在Flink客户端查询hive表,返回的结果都是NULL。这个蜂巢桌是兽人桌。我不知道为什么 Flink SQL Flink SQL

  • 我尝试通过IDs连接两个数据流,发现有两个API集可以这样做, < Li > https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining . html < Li > https://ci . Apache . org/projects/flink/flink-docs-r

  • 我试图将数据从kafka主题读入DataStream并注册DataStream,然后使用tableEnvironment.sqlQuery(“sql”)查询数据,当tableEnvironment.execute()没有错误也没有输出时。 依赖关系: flink-streaming-java2.11版本:1.9.0-csa1.0.0.0; Flink-Streaming-Scala2.11版本:1