Luigi:
author: vincentzhwg@gmail.com
date: 2014.5.6
blog已迁移,最新的Luigi教程更新在:http://guan58.com/archives/38
### web
https://github.com/spotify/luigi
### intro
Luigi是基于python语言的,可帮助建立复杂流式批处理任务管理系统。它主要提供了以下功能:任务依赖管理、工作流管理、任务可视化、错误故障处理机制、命令行交互等。Luigi的主要目的是为了解决需要长期运行的流式批处理任务的管理。你可以链接很多个任务,使它们自动化,并进行故障管理。上面所说的任务可以是任何类型的任务,通常来说有如下几种:Hadoop任务、从数据库导入或导出、机器学习算法训练等。
### API概览
在Luigi中有两个基础类:Task, Target. 另外,Parameter类对于如何控制Task类的运行是一个重要的类。
Target:
广义地讲,Target可对应为磁盘上的文件,或HDFS上文件,或checkpoint点,或数据库等。对于Target来说,唯一需要实现的方法为exists,返回为True表示存在,否则不存在返回为False.
在实际应用时,写一个Target子类是很少需要用到的。直接使用开箱即可用的LocalTarget及 hdfs.HdfsTarget类就够用了。Luigi提供了Gzip支持,通过参数format=format.Gzip即可。
Task:
Task是任务逻辑运行的地方,提供了一些方法来定义任务的逻辑行为,主要有run, output, requires.
Task通过类名及参数值做为标识符进行唯一区分。实际上,在同一个worker中,两个拥有相同类名及相同参数值的task不单单只是equal,而且实际上还是同一个实例。然而,如果参数在构建声明时指定了参数 significant=False ,对于Task的标识是不起影响的。对于多个Task,它们的类名相同,只是指定了 significant=False 的参数值才不同,而未指定 significant=False 的参数值是相同的,对于这些Task来说,它们拥有相同的标识符,即 hash(taskA) == hash(taskB) 是True的,但它们来自于不同的实例。
Task.requires:
requires方法用来指定依赖关系,除了可指定对其他Task的依赖,还可指定为对自身Task的依赖。requires返回值可为 dicts/lists/tuples 或其他类别的封装。
Task.output:
output方法返回一个或多个的Target对象,类似于requires方法,可返回适应于实际需要的对于Target的任何封装。实际上,建议只返回一个Target,因为如果返回多个,atomicity将会被丢失,除非Task能够确保多个Target能被原子性地创建。当然,如果原子性不是非常重要的时候,那么就可以放心地返回多个Target。
Task.run:
run方法包含实际真正执行的代码。注意到,Luigi将任何事情切分为两个阶段,首先它指出在tasks之间的依赖关系,然后它运行每一件事情。 input() 方法是一个内部帮助方法,用来替代在requires 中的对象的对应输出。
Parameter:
在Python语言中,参数通常是在constructor时提供,但Luigi要求在类级别上声明所需的参数。通过这样子的要求,Luigi通过处理这些模板规范化的代码来为constructor提供所需参数。
Python是个无需指定类型的语言(Python是个强类型动态语言),对于参数无需指定类型。对于Luigi来说,可以简单地使用 luigi.Parameter 即可,之所以存在 DateParameter 的原因,是为了在命令行交互时,确保命令行参数的值可以转换为对应的类型。
Events and callbacks:
Luigi内置了事件系统,允许注册callback到event中,并触发它们在所定义的tasks中,可挂接到一些预定义好的事件中,或者自定义事件中。每一个event被绑定到一个Task类中,将会被该Task类或其子类所触发。
Instance caching:
对于实例,Luigi提供了Instance caching。对于同一个标识符的task,就算在代码中实例化创建了两次,但实际上只会创建一个实例,这个是有必要的,确保了task只会被执行一次。
### Execution Model
Luigi拥有一个非常简单的运行模型,最重要的一个方面就是没有执行转移。当执行一个Luigi的工作流,worker调度所有的tasks,并在同样的这些进程内执行这些tasks。受益于这种模式,非常容易对所有执行任务进行debug。并且,开发过程也相当简单。在开发过程中,通常通过命令行来运行Luigi,而当你布署时,可以通过crontab或任何其他的调度器来调度。这种模式所带来的不好的地方,在于Luigi不能自由地进行扩展,不过Luigi认为扩展应该交给Task去实现;另外一个不好的地方就是Luigi需要依赖于外部的调度器来触发工作流,如crontab等。
### Lugic Patterns
Code Reuse
Luigi的一个好处,是非常容易依赖于其他库中所定义的tasks。在执行路径上非常容易进行分叉,其中一项任务的输出可以成为很多其他任务的输入。
同时,Luigi任务的输出都将被无限期地保存。这点的好处就是当后面的任务失败时,在重跑失败任务时可以重用前面任务的输出,而不需要重跑前面的任务。不好的地方在于,将会有大量的中间结果保存在系统上,一个比较有用的建议就是把这些输出保存在一个特定的目录中,并进行定期地清除。
Triggering Mang Tasks
一个常见的用例是每晚要运行一个Hadoop任务,但有时因为各种原因该任务会失败。一个有用的模式就是在最后建立一个虚拟任务,仅需声明依赖于最近多天之间的实际真正的任务。
### Configuration
所有的配置均可由两个配置文件进行指定,一个是在当前工作目录下的 client.cfg ,另一个则为 /etc/luigi 。当前工作目录下的 client.cfg 高于 /etc/luigi 。
配置选项有:
default-scheduler-host : 默认的scheduler
error-email : 当crash时会收到eamil,但在命令行下运行时则没有。
luigi-history : 如果设置了该选项,值为一文件名,将为记录一些东西(当前仅有job id)在mapreduce任务的输出目录下。
如果想在Python下运行Hadoop mapreduce任务,需要指定streaming jar的路径。
发送error告警邮件的配置示例:
为了能够在发生error时能够发送邮件,得满足两个条件:一个是输出不能从终端直接输出,得重定向到其他地方;另外一个是在配置文件中进行了相关配置,配置示例如下。
[core]
error-email: receiver@xxx.com
email-sender: sender@xxx.com
smtp_ssl: False
smtp_host: smtp.xxx.com
smtp_port: 25
smtp_login: sender@xxx.com
smtp_password: sender_password
日志配置示例:
如果不配置一个日志配置文件的话,会经常报一个提示:No handlers could be found for logger "luigi-interface" 。其中一种配置使用方式如下:
在工程根目录下建立一个 conf 目录,在其下生成 logging.conf 文件,内容如下:
[formatters]
keys=simple,detail
[handlers]
keys=file_rotate
[loggers]
keys=root
[formatter_simple]
format=%(levelname)s: %(message)s
[formatter_detail]
format=%(asctime)s %(levelname)s [%(filename)s %(lineno)d]: %(message)s
[handler_file_rotate]
formatter=detail
class=handlers.RotatingFileHandler
args=(os.environ['MAIN_ROOT_DIR'] + "/logs/main.log", 'a', 1024 * 1024 * 500, 1)
[logger_root]
handlers=file_rotate
#level=DEBUG
level=INFO
#level=WARNING
在入口执行的py文件中放入以下代码:
#### MAIN_ROOT_DIR is used in loggin.conf for relative log file path
MAIN_ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
os.environ['MAIN_ROOT_DIR'] = MAIN_ROOT_DIR
loggingConfigFile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conf/logging.conf')
logging.config.fileConfig( loggingConfigFile, disable_existing_loggers = False)
logger = logging.getLogger( __name__ )
### 现阶段不足
Luigi关注于批处理任务,所以对于实时流式处理及长时间一直运行的处理的帮助不大。
Luigi假设每一个task是工作中的相当大的一块数据处理。对于调度几千个任务是可以的,但将其扩展到数万个任务的调度是不太可行的。可通过把不同类别的任务拆分成组,同一组或多组的tasks跑在一个Luigi实例上,通过布多个Luigi实例来解决此问题。
Luigi对于任务调度及任务执行之间进行了严格地划分。动态的for循环及分支在Luigi中是不太容易实现的,举个例子,遍历一个数值计算任务直到它收敛是非常棘手的。
### 花边
Luigi是世界上最有名排第二的水管工的名字,中文译名为路易奇或路奇,有着“永远的老二”的外号。至于第一是谁,想必大家都猜到了吧 ^_^
### 代码示例
当在 Task.requires 下返回多个依赖的task时,在return语句中,越排在后面的反而先执行,代码示例如下:
class Z(luigi.Task):
def requires(self):
return S(), P() ### 在执行时, 先执行 P 任务, 之后再执行 S 任务。所以当对依赖任务的执行顺序有要求时,请注意这里的排列顺序。
启动server并以daemon方式运行的shell命令例子,注意将其中的一些路径换成实际路径,不要放在/tmp目录下,以免当luigid实例被kill掉时,所保存的log及state文件因重启系统而丢失了:
luigid --pidfile /tmp/luigid.pid --logdir /tmp/luigi/log/ --state-path /tmp/luigi/state --background
在central scheduler模式下,提交task到server中的代码示例:
sch = luigi.rpc.RemoteScheduler(host=..., port=...)
w = luigi.worker.Worker(scheduler=sch)
w.add(task)
w.run()
将本地文件put到hdfs上的代码示例:
import luigi, luigi.hdfs
from datetime import datetime
## 本示例将本地的 /tmp/abc_%Y%m%d.txt 文件上传到 HDFS 上的 /test/abc_%Y%m%d 路径
class BBF(luigi.ExternalTask):
date = luigi.Parameter()
def output(self):
date = datetime.strptime(self.date, '%Y-%m-%d')
return luigi.LocalTarget( date.strftime("/tmp/abc_%Y%m%d.txt") )
class BF(luigi.Task):
date = luigi.Parameter()
def requires(self):
return BBF(self.date)
def output(self):
date = datetime.strptime(self.date, '%Y-%m-%d')
return luigi.hdfs.HdfsTarget(date.strftime("/test/abc_%Y%m%d.txt"))
def run(self):
hdfsClient = luigi.hdfs.HdfsClientApache1()
hdfsClient.put( self.input().path, self.output().path )
执行hive语句的代码示例:
import luigi, luigi.hdfs, luigi.hive
from datetime import datetime
## 本示例是在hive上执行建立表分区的语句, 在 test 库的 abc 表上建立三个字段 pyearmonth, pday, pappid 对应的分区,并指向hdfs上的目录路径为 /test/abc 所对应的具体分区路径下
class P(luigi.hive.HiveQueryTask):
date = luigi.Parameter()
appid = luigi.Parameter()
def output(self):
date = datetime.strptime(self.date, '%Y-%m-%d')
return luigi.hive.HivePartitionTarget(table='abc', partition={'pyearmonth':date.strftime("%Y%m"), 'pday':date.strftime("%d"), 'pappid':self.appid}, database='test')
def query(self):
date = datetime.strptime(self.date, '%Y-%m-%d')
return """
USE test;
ALTER TABLE abc ADD IF NOT EXISTS PARTITION ( pyearmonth='{ym}', pday='{d}', pappid='{appid}' ) LOCATION '/test/abc/pyearmonth={ym}/pday={d}/pappid={appid}';
""".format(ym=date.strftime("%Y%m"), d=date.strftime("%d"), appid=self.appid)
### 安装
sudo apt-get install build-essential python-dev python-daemon python-setuptools libcurl4-gnutls-dev librtmp-dev
以下包的安装,后面括号内空为安装步骤的简单记录:
pycares (./build_inplace; python setup.py build; sudo python setup.py install)
pycurl (python setup.py build; sudo python setup.py install)
unittest2 (python setup.py build; sudo python setup.py install)
futures (python setup.py build; sudo python setup.py install)
Monotime (python setup.py build; sudo python setup.py install)
Twisted (sudo python setup.py install)
backports.ssl_match_hostname (python setup.py build; sudo python setup.py install)
tornado (python setup.py build; sudo python setup.py install)
mechanize (python setup.py build; sudo python setup.py install)
simplejson (这个可提高 luigi 的json性能,在luigi源码中有时在使用json时要是有simplejson则使用simplejson模块,否则使用自带的json库,但自带的json库性能比较差)
luigi ( 从 github上下载zip包,解压开之后, sudo python setu.py install )