我正在尝试将Scala UDF(Scala.collection.immutable.map)的对象输出映射到表API中的某个有效数据类型,即通过Java类型(
Java.util.map
)映射到这里推荐的表API中的某个有效数据类型:Flink Table API
你知道正确的方法吗?如果是,是否有方法将转换推广到类型为Map[字符串,任意]
的(嵌套)Scala对象?
代码
Scala自定义项
class dummyMap() extends ScalarFunction {
def eval() = {
val whatevermap = Map("key1" -> "val1", "key2" -> "val2")
whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
}
}
下沉
my_sink_ddl = f"""
create table mySink (
output_of_dummyMap_udf MAP<STRING,STRING>
) with (
...
)
"""
错误
Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType<java.util.Map>]
TableSink schema: [output_of_my_scala_udf: Map<String, String>]
谢谢
魏忠的原始答案。我只是个记者。谢谢小薇!
此时(Flink 1.11),有两种方法正在工作:
代码
Scala自定义项
package com.dummy
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
class dummyMap extends ScalarFunction {
// If the udf would be registered by the SQL statement, you need add this typehint
@DataTypeHint("ROW<s STRING,t STRING>")
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
}
// If the udf would be registered by the method 'register_java_function', you need override this
// method.
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
}
}
Python代码
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)
# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/zhongwei/the-dummy-udf.jar")
# register the udf via
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW<s STRING,t STRING>
) with (
'connector' = 'print'
)""")
# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
我在Spark中有一个数据框架,其中包含许多列和我定义的udf。我想要返回相同的数据帧,除了一列被转换。此外,我的udf接收字符串并返回时间戳。有一个简单的方法可以做到这一点吗?我试过了 但这返回一个RDD,并且只返回转换后的列。
有人能分享一下如何将转换为吗?
我需要在代码的几个地方将这个映射转换为我的case类,如下所示: 最简单的方法是什么?我能用隐式吗?
我有两个嵌套的case类: 当然,解决方案应该是泛型的,并且适用于任何case类。 注意:这个讨论很好地回答了如何将单个case类映射到映射。但我无法将其用于嵌套的case类。相反,我得到了:
我有一张这样的地图<代码>地图 我使用了对象映射器,但它不工作,因为映射在我的结构中包含另一个映射。
我有以下代码,希望使用Java8将列表转换为。 当我尝试将列表中的单个值映射为映射的键时,我得到了一个错误。