当前位置: 首页 > 工具软件 > ICETutorial > 使用案例 >

pyflink连接iceberg 实践

锺离珂
2023-12-01

参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/index.html

pyflink是什么

  1. 数据流处理的框架
  2. 这个框架是同时运行在多台主机上
  3. 通过某种方式这多台主机之间可以通信
  4. 可以单机运行
  5. pyflink只是对java的flink的一个调用工具,不能直接用python来对sourcesink组件进行实现。

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

PyFlink 架构

PyFlink 的核心目标:

1.将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。

2.将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。

应用场景

  • 第一个,事件驱动型,比如:刷单,监控等;
  • 第二个,数据分析型的,比如:库存,双11大屏等;
  • 第三个适用的场景是数据管道,也就是ETL场景,比如一些日志的解析等;
  • 第四个场景,机器学习,比如个性推荐等。

API

Flink 为流/批处理应用程序提供了不同级别的抽象。

  • SQL
  • Table API
  • DataStream/DataSet API(核心 API)
  • Stateful Stream Processing

PyFlink API 完全与 Java Table API 对齐,各种关系操作都支持,同时对 window 也有很好的支持,除了这些 APIs,PyFlink还提供多种定义 Python UDF 的方式.

UDF自定义函数

首先,可以扩展 ScalarFunction,这种方式可以提供更多的辅助功能,比如添加 Metrics 。除此之外 Python 语言所支持的任何方式的方法定义,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。

当定义完方法后,用 PyFlink 所提供的 Decorators 进行打标,并描述 input 和 output 的数据类型就可以了。后面版本也可以根据 Python 语言的 type hint 特性再进一步简化,进行类型推导。

from pyflink.table import ScalarFunction, DataTypes
from pyflink.table.udf import udf

# Extend ScalarFunction
class ADD(ScalarFunction):
    def eval(self, i ,j):
        return i + j

add1 = udf (ADD(), [DataTypes.BIGINT(), DataTypes.BIGINT()] ,DataTypes.BIGINT())

# Named function
@udf(input_types =  [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
def add2( i ,j):
    return i + j

# Lambda function
add3 = udf(lambda i,j :i+j, [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())

# Callable Function
class CallableAdd(object):
    def __call__(self, i,j):
        return i + j

add4 = udf(CallableAdd(),[DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())

pyflink安装

pip3  install -i https://pypi.tuna.tsinghua.edu.cn/simple/ apache-flink==1.13.5

最好指定版本,如1.13.2

实战

apache-flink 1.13.2

读取iceberg数据

pyflink安装目录/lib

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

hive-exec-3.1.2.jar

alluxio-2.6.2-client.jar

iceberg-flink-runtime-1.13-0.13.1.jar

import os


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment,StreamTableEnvironment

os.environ.setdefault('HADOOP_USER_NAME', 'root')

env = StreamExecutionEnvironment.get_execution_environment()
env_settings  = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env,environment_settings=env_settings )


t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

iceberg_hive_catalog = """
    CREATE CATALOG iceberg WITH
    (
        'type'='iceberg'
        ,'catalog-type'='hive' -- 可选 catalog类型 hive、hadoop、custom
        ,'property-version'='1' -- 可选 属性版本号,可向后兼容,目前版本号为1
        ,'cache-enabled' = 'true' -- 可选 是否启用catalog缓存 默认为true
        ,'uri'='thrift://192.168.xxx.xxx:9083' -- 必填 hive 元数据存储连接
        ,'clients'='5' -- hive metastore clients连接池大小,默认为2
        ,'warehouse'='hdfs://ns1/lakehouse/'
    )
""" 
 

t_env.get_current_catalog()
t_env.get_current_database()
# t_env.execute_sql(iceberg_hive_catalog).print()
 

t_env.execute_sql("use catalog iceberg").print()

t_env.execute_sql("show current catalog").print()
#
t_env.execute_sql("show databases").print()

t_env.execute_sql("use dbname").print()

t_env.execute_sql("show tables").print()
 

 
table1 = t_env.execute_sql(
    "select * from  ***")

table2 = t_env.sql_query(
    "select * from  xxx")
 

pd = table2.to_pandas()

读取mysql数据

配置

pyflink安装目录/lib

mysql-connector-java-8.0.16.jar

flink-connector-jdbc_2.12-1.13.2.jar(https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.13.2)

mysql创表

CREATE TABLE `flink_test` (

  `f0` int(11) DEFAULT NULL,
  `f1` int(11) DEFAULT NULL
) 

insert into flink_test VALUES(1,11)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import EnvironmentSettings, TableEnvironment
# create environment
from pyflink.table.expressions import lit
 
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
 


mysql_sink_ddl = """
CREATE TABLE flink_test (
  id BIGINT,
  word VARCHAR,
 `count` BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
 'connector.type' = 'jdbc', -- 使用 jdbc connector
 'connector.url' = 'jdbc:mysql://192.168.xx.xx:3306/test',
 'connector.table' = 'flink_test',
 'connector.username' = 'xxx',
 'connector.password' = 'xxx',
 'connector.write.flush.interval' = '1s'
)
"""

mysql_sink_ddl = """
create table flink_test (
f0 INT,
f1 INT
) WITH (
'connector' = 'jdbc', -- 使用 jdbc connector
 'url' = 'jdbc:mysql://192.168.xx.xxx:3306/test',
 'username' = 'xx',
 'table-name' = 'flink_test',
 'password' = 'xx'
  
)
"""
 



t_env.execute_sql(mysql_sink_ddl)
table = t_env.execute_sql("select * from flink_test")
# +-------------+-------------+
# |          f0 |          f1 |
# +-------------+-------------+
# |           1 |          11 |
# +-------------+-------------+
# 1 row in set
table = t_env.execute_sql("insert into  flink_test values(2,22)")

table2 = t_env.sql_query('select * from flink_test')
# tab = t_env.from_path('flink_test')
table2.to_pandas()
#    f0  f1
# 0   1  11
# 1   2  22

读取一个 csv 文件,计算词频,并将结果写到一个结果文件中

文件名:word_count.py

参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# 创建TableEnvironment 。这是Python Table API作业的入口类。
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

# 创建源表和结果表,在ExecutionEnvironment中注册表名分别为mySource和mySink的表。
t_env.connect(FileSystem().path('./input/word_count.csv')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('./ouput/output.csv')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')
    
# 下列代码实现的输出文件名是乱码,'/ouput/output.csv'被当作目录
'''
my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = './input/word_count.csv'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/ouput/output.csv'
    )
"""
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
'''
# 该作业读取表mySource中的数据
tab = t_env.from_path('mySource')
# 启动Flink Python Table API作业
# 当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

报错

Could not find any factory for identifier ‘jdbc‘

flink-connector-jdbc_2.12-1.13.2.jar包没有放入指定位置

org.apache.flink.sql.parser.impl.ParseException: Encountered “)” at line 12, column 1.

建表语句最后一个括号前多了个逗号

参考

实例:

csv读入参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/

使用mysql_Flink 使用python连接mysql: https://blog.csdn.net/weixin_32136203/article/details/112684291

 类似资料: