pyetl是一个纯python开发的ETL框架, 相比sqoop, datax 之类的ETL工具,pyetl可以对每个字段添加udf函数,使得数据转换过程更加灵活,相比专业ETL工具pyetl更轻量,纯python代码操作,更加符合开发人员习惯
安装
pip3 install pyetl
使用示例
数据库表之间数据同步
from pyetl import Task, DatabaseReader, DatabaseWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target") Task(reader, writer).start()
数据库表到hive表同步
from pyetl import Task, DatabaseReader, HiveWriter2 reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = HiveWriter2("hive://localhost:10000/default", table_name="target") Task(reader, writer).start()
数据库表同步es
from pyetl import Task, DatabaseReader, ElasticSearchWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget") Task(reader, writer).start()
原始表目标表字段名称不同,需要添加字段映射
添加
# 原始表source包含uuid,full_name字段 reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") # 目标表target包含id,name字段 writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") # columns配置目标表和原始表的字段映射关系 columns = {"id": "uuid", "name": "full_name"} Task(reader, writer, columns=columns).start()
字段的udf映射,对字段进行规则校验、数据标准化、数据清洗等
# functions配置字段的udf映射,如下id转字符串,name去除前后空格 functions={"id": str, "name": lambda x: x.strip()} Task(reader, writer, columns=columns, functions=functions).start()
继承Task类灵活扩展ETL任务
import json from pyetl import Task, DatabaseReader, DatabaseWriter class NewTask(Task): reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") def get_columns(self): """通过函数的方式生成字段映射配置,使用更灵活""" # 以下示例将数据库中的字段映射配置取出后转字典类型返回 sql = "select columns from task where name='new_task'" columns = self.writer.db.read_one(sql)["columns"] return json.loads(columns) def get_functions(self): """通过函数的方式生成字段的udf映射""" # 以下示例将每个字段类型都转换为字符串 return {col: str for col in self.columns} def apply_function(self, record): """数据流中对一整条数据的udf""" record["flag"] = int(record["id"]) % 2 return record def before(self): """任务开始前要执行的操作, 如初始化任务表,创建目标表等""" sql = "create table destination_table(id int, name varchar(100))" self.writer.db.execute(sql) def after(self): """任务完成后要执行的操作,如更新任务状态等""" sql = "update task set status='done' where name='new_task'" self.writer.db.execute(sql) NewTask().start()
目前已实现Reader和Writer列表
Reader | 介绍 |
---|---|
DatabaseReader | 支持所有关系型数据库的读取 |
FileReader | 结构化文本数据读取,如csv文件 |
ExcelReader | Excel表文件读取 |
Writer | 介绍 |
---|---|
DatabaseWriter | 支持所有关系型数据库的写入 |
ElasticSearchWriter | 批量写入数据到es索引 |
HiveWriter | 批量插入hive表 |
HiveWriter2 | Load data方式导入hive表(推荐) |
FileWriter | 写入数据到文本文件 |
项目地址pyetl
总结
到此这篇关于python ETL工具 pyetl的文章就介绍到这了,更多相关python ETL工具 pyetl内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!
龙虎牛熊多头合约池 接口名称 long_pool 接口描述 龙虎牛熊多头合约池接口 请求参数 参数名 说明 举例 date 查询日期 2018-08-08 返回参数 参数名 类型 说明 symbol string 品种编码 code string 合约代号 示例代码 from akshare import pro_api pro = pro_api(token="在此处输入您的token,可以通过
工具 客户端 客户端分为三种:完整客户端、轻量级客户端和在线客户端。 完整客户端:存储所有的交易历史记录,功能完备; 轻量级客户端:不保存交易副本,交易需要向别人查询; 在线客户端:通过网页模式来浏览第三方服务器提供的服务。 钱包 矿机 专门为“挖矿”设计的硬件,包括基于 GPU 和 ASIC 的芯片。 脚本 比特币交易支持一种比较简单的脚本语言(类 Forth 的栈脚本语言),可以写入 UTXO
工具 以下的一些工具可以帮助你自动检查项目中的 Ruby 代码是否符合这份指南。 RuboCop [RuboCop][] 是一个基于本指南的 Ruby 代码风格检查工具。RuboCop 涵盖了本指南相当大的部分,其同时支持 MRI 1.9 和 MRI 2.0,且与 Emacs 整合良好。 RubyMine RubyMine 的代码检查部分基于本指南。
10.7. 工具 本章剩下的部分将讨论Go语言工具箱的具体功能,包括如何下载、格式化、构建、测试和安装Go语言编写的程序。 Go语言的工具箱集合了一系列的功能的命令集。它可以看作是一个包管理器(类似于Linux中的apt和rpm工具),用于包的查询、计算包的依赖关系、从远程版本控制系统下载它们等任务。它也是一个构建系统,计算文件的依赖关系,然后调用编译器、汇编器和链接器构建程序,虽然它故意被设计成
vse命令行工具 yocode扩展生成器 范例
提供各种支付需要的配置生成方法。 配置 <?php use EasyWeChat\Pay\Application; $config = [...]; $app = new Application($config); $utils = $app->getUtils(); 注意 生成支付 JS 配置 有四种发起支付的方式:WeixinJSBridge, JSSDK, 小程序支付, APP We
CoreOS 内置了 服务发现,容器管理 工具。 服务发现 CoreOS 的第一个重要组件就是使用 etcd 来实现的服务发现。在 CoreOS 中 etcd 默认以 rkt 容器方式运行。 etcd 使用方法请查看 etcd 章节。 容器管理 第二个组件就是 Docker,它用来运行你的代码和应用。CoreOS 内置 Docker,具体使用请参考本书其他章节。
由于 OpenStack 自身的各个组件都是松耦合关系,常见的部署也都是分布式部署,造成要深入体会 Neutron 的工作过程,或者进行故障诊断,往往涉及到多个组件,是否繁琐。 本章介绍一些工具,提高操作和开发的效率。