参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/index.html
pyflink
只是对java的flink的一个调用工具,不能直接用python
来对source
、sink
组件进行实现。Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
PyFlink 的核心目标:
1.将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。
2.将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。
Flink 为流/批处理应用程序提供了不同级别的抽象。
PyFlink API 完全与 Java Table API 对齐,各种关系操作都支持,同时对 window 也有很好的支持,除了这些 APIs,PyFlink还提供多种定义 Python UDF 的方式.
首先,可以扩展 ScalarFunction,这种方式可以提供更多的辅助功能,比如添加 Metrics 。除此之外 Python 语言所支持的任何方式的方法定义,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。
当定义完方法后,用 PyFlink 所提供的 Decorators 进行打标,并描述 input 和 output 的数据类型就可以了。后面版本也可以根据 Python 语言的 type hint 特性再进一步简化,进行类型推导。
from pyflink.table import ScalarFunction, DataTypes
from pyflink.table.udf import udf
# Extend ScalarFunction
class ADD(ScalarFunction):
def eval(self, i ,j):
return i + j
add1 = udf (ADD(), [DataTypes.BIGINT(), DataTypes.BIGINT()] ,DataTypes.BIGINT())
# Named function
@udf(input_types = [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
def add2( i ,j):
return i + j
# Lambda function
add3 = udf(lambda i,j :i+j, [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
# Callable Function
class CallableAdd(object):
def __call__(self, i,j):
return i + j
add4 = udf(CallableAdd(),[DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple/ apache-flink==1.13.5
最好指定版本,如1.13.2
apache-flink 1.13.2
pyflink安装目录/lib
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
hive-exec-3.1.2.jar
alluxio-2.6.2-client.jar
iceberg-flink-runtime-1.13-0.13.1.jar
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment,StreamTableEnvironment
os.environ.setdefault('HADOOP_USER_NAME', 'root')
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env,environment_settings=env_settings )
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
iceberg_hive_catalog = """
CREATE CATALOG iceberg WITH
(
'type'='iceberg'
,'catalog-type'='hive' -- 可选 catalog类型 hive、hadoop、custom
,'property-version'='1' -- 可选 属性版本号,可向后兼容,目前版本号为1
,'cache-enabled' = 'true' -- 可选 是否启用catalog缓存 默认为true
,'uri'='thrift://192.168.xxx.xxx:9083' -- 必填 hive 元数据存储连接
,'clients'='5' -- hive metastore clients连接池大小,默认为2
,'warehouse'='hdfs://ns1/lakehouse/'
)
"""
t_env.get_current_catalog()
t_env.get_current_database()
# t_env.execute_sql(iceberg_hive_catalog).print()
t_env.execute_sql("use catalog iceberg").print()
t_env.execute_sql("show current catalog").print()
#
t_env.execute_sql("show databases").print()
t_env.execute_sql("use dbname").print()
t_env.execute_sql("show tables").print()
table1 = t_env.execute_sql(
"select * from ***")
table2 = t_env.sql_query(
"select * from xxx")
pd = table2.to_pandas()
pyflink安装目录/lib
mysql-connector-java-8.0.16.jar
flink-connector-jdbc_2.12-1.13.2.jar(https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.13.2)
mysql创表
CREATE TABLE `flink_test` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL
)
insert into flink_test VALUES(1,11)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import EnvironmentSettings, TableEnvironment
# create environment
from pyflink.table.expressions import lit
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
mysql_sink_ddl = """
CREATE TABLE flink_test (
id BIGINT,
word VARCHAR,
`count` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://192.168.xx.xx:3306/test',
'connector.table' = 'flink_test',
'connector.username' = 'xxx',
'connector.password' = 'xxx',
'connector.write.flush.interval' = '1s'
)
"""
mysql_sink_ddl = """
create table flink_test (
f0 INT,
f1 INT
) WITH (
'connector' = 'jdbc', -- 使用 jdbc connector
'url' = 'jdbc:mysql://192.168.xx.xxx:3306/test',
'username' = 'xx',
'table-name' = 'flink_test',
'password' = 'xx'
)
"""
t_env.execute_sql(mysql_sink_ddl)
table = t_env.execute_sql("select * from flink_test")
# +-------------+-------------+
# | f0 | f1 |
# +-------------+-------------+
# | 1 | 11 |
# +-------------+-------------+
# 1 row in set
table = t_env.execute_sql("insert into flink_test values(2,22)")
table2 = t_env.sql_query('select * from flink_test')
# tab = t_env.from_path('flink_test')
table2.to_pandas()
# f0 f1
# 0 1 11
# 1 2 22
文件名:word_count.py
参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
# 创建TableEnvironment 。这是Python Table API作业的入口类。
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# 创建源表和结果表,在ExecutionEnvironment中注册表名分别为mySource和mySink的表。
t_env.connect(FileSystem().path('./input/word_count.csv')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('./ouput/output.csv')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
# 下列代码实现的输出文件名是乱码,'/ouput/output.csv'被当作目录
'''
my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = './input/word_count.csv'
)
"""
my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/ouput/output.csv'
)
"""
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
'''
# 该作业读取表mySource中的数据
tab = t_env.from_path('mySource')
# 启动Flink Python Table API作业
# 当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()
flink-connector-jdbc_2.12-1.13.2.jar包没有放入指定位置
建表语句最后一个括号前多了个逗号
实例:
csv读入参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/
使用mysql_Flink 使用python连接mysql: https://blog.csdn.net/weixin_32136203/article/details/112684291