CREATE TABLE source_table (
userId INT,
eventTime as '2021-10-01 08:08:08',
eventType as 'click',
productId INT,
-- 数组(Array)类型
productImages as ARRAY['image1','image2'],
-- 对象(Map)类型
pageInfo as MAP['pageId','1','pageName','yyds']
) WITH (
'connector' = 'datagen',
'number-of-rows' = '2',
'fields.userId.kind' = 'random',
'fields.userId.min' = '2',
'fields.userId.max' = '2',
'fields.productId.kind' = 'sequence',
'fields.productId.start' = '1',
'fields.productId.end' = '2'
);
将Map展开为多列多行。
SELECT userId,eventTime,eventType,productId,mapKey,mapValue
FROM source_table, UNNEST(pageInfo) as t(mapKey,mapValue);
package com.bigdata.flink.sql.udtf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.util.Map;
@FunctionHint(
input = @DataTypeHint("MAP<STRING, STRING>"),
output = @DataTypeHint("ROW<mapKey STRING, mapValue STRING>"))
public class ExpandMapMultColumnMultRowUDTF extends TableFunction {
public void eval(Map<String, String> pageInfo) {
for (Map.Entry<String, String> entry : pageInfo.entrySet()) {
// 原来的一行,每个Key都输出一行
collect(Row.of(entry.getKey(), entry.getValue()));
}
}
}
// SQL使用
SELECT userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table
, LATERAL TABLE (ExpandMapMultColumnMultRowUDTF(`pageInfo`)) AS t(mapKey,mapValue)
userId eventTime eventType productId mapKey mapValue
2 2021-10-01 08:08:08 click 1 pageId 1
2 2021-10-01 08:08:08 click 1 pageName yyds
2 2021-10-01 08:08:08 click 2 pageId 1
2 2021-10-01 08:08:08 click 2 pageName yyds
SELECT userId,eventTime,eventType,productId,productImage
FROM source_table, UNNEST(productImages) as t(productImage);
package com.bigdata.flink.sql.udtf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
@FunctionHint(
input = @DataTypeHint("ARRAY<String>"),
output = @DataTypeHint("String"))
public class ExpandArrayOneColumnMultRowUDTF extends TableFunction {
public void eval(String... productImages) {
for (String productImage : productImages) {
collect(productImage);
}
}
}
// SQL
SELECT userId, eventTime, eventType, productId, productImage FROM source_table
, LATERAL TABLE (ExpandArrayOneColumnMultRowUDTF(`productImages`)) AS t(productImage);
userId eventTime eventType productId productImage
2 2021-10-01 08:08:08 click 1 image1
2 2021-10-01 08:08:08 click 1 image2
2 2021-10-01 08:08:08 click 2 image1
2 2021-10-01 08:08:08 click 2 image2
在 SQL 任务里面经常会遇到一列转多行的需求,今天就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例.
需求
原始数据格式如下:
name data
JasonLee [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]
data 格式化
{
"name": "JasonLee",
"data": [{
"content_type": "flink",
"url": "111"
}, {
"content_type": "spark",
"url": "222"
},
{
"content_type": "hadoop",
"url": "333"
}
]
}
现在希望得到的数据格式是这样的:
name content_type url
JasonLee flink 111
JasonLee spark 222
JasonLee hadoop 333
这是一个典型的列转行或者一行转多行的场景,需要将 data 列进行拆分成为多行多列,下面介绍两种实现方式.
使用 Flink 自带的 unnest 函数解析
使用自定义 UDTF 函数解析
建表 DDL
CREATE TABLE kafka_table (
name string,
`data` ARRAY<ROW<content_type STRING,url STRING>>
)
WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'test',
'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092', -- broker连接信息
'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id
'scan.startup.mode' = 'latest-offset', -- 读取数据的位置
'format' = 'json', -- 数据源格式为 json
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
)
这里在定义 data 字段类型的时候需要定义为 ARRAY 类型,因为 unnest 函数需要一个数组类型的参数.
--------------------------------------------------
1 unnest解析
select name,content_type,url
from kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)
select name,content_type,url
from kafka_table, UNNEST(`data`) AS t (content_type,url)
select name,content_type,url
from kafka_table left join UNNEST(`data`) AS t (content_type,url) on true
2 自定义 UDTF 解析
自定义表值函数(UDTF),自定义表值函数,将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法, 在使用 UDTF 时,需要带上 LATERAL TABLE两个关键字.
@FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))
public class ParserJsonArrayTest extends TableFunction<Row> {
private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);
public void eval(String value) {
try {
JSONArray snapshots = JSONArray.parseArray(value);
Iterator<Object> iterator = snapshots.iterator();
while (iterator.hasNext()) {
JSONObject jsonObject = (JSONObject) iterator.next();
String content_type = jsonObject.getString("content_type");
String url = jsonObject.getString("url");
collect(Row.of(content_type,url));
}
} catch (Exception e) {
log.error("parser json failed :" + e.getMessage());
}
}
}
自定义 UDTF 解析的时候,就不需要把 data 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.
Flink SQL 使用 UDTF
select name,content_type,url
from kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
select name,content_type,url
from kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
select name,content_type,url
from kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true
注意:
unnest 和 自定义 UDTF 函数在使用的时候都有 3 种写法,前面两种写法的效果其实是一样的,第三种写法相当于 left join 的用法.区别在于 CROSS JOIN/INNER JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行不输出.LEFT JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行会输出,右侧 UDTF 字段为 null
打印的结果
2> JasonLee,flink,111
2> JasonLee,spark,222
2> JasonLee,hadoop,333
总结
在实际使用的时候如果 unnest 可以满足需求就直接用 unnest 不需要带来额外的开发,如果 unnest 函数满足不了需求,那么就自定义 UDTF 去完成.
end