当前位置: 首页 > 工具软件 > Luigi > 使用案例 >

luigi任务调度框架

洪星文
2023-12-01

原文:

http://blog.kissdata.com/2014/05/28/lugi.html

Luigi

介绍

Luigi是基于python语言的,可帮助建立复杂流式批处理任务管理系统。它主要提供了以下功能:任务依赖管理、工作流管理、任务可视化、错误故障处理机制、命令行交互等。Luigi的主要目的是为了解决需要长期运行的流式批处理任务的管理。你可以链接很多个任务,使它们自动化,并进行故障管理。上面所说的任务可以是任何类型的任务,通常来说有如下几种:Hadoop任务、从数据库导入或导出、机器学习算法训练等。

API概览

在Luigi中有两个基础类:Task, Target。 另外,Parameter类对于如何控制Task类的运行是一个重要的类。

Target

广义地讲,Target可对应为磁盘上的文件,或HDFS上文件,或checkpoint点,或数据库等。对于Target来说,唯一需要实现的方法为exists,返回为True表示存在,否则不存在返回为False. 在实际应用时,写一个Target子类是很少需要用到的。直接使用开箱即可用的LocalTarget及 hdfs.HdfsTarget类就够用了。Luigi提供了Gzip支持,通过参数format=format.Gzip即可。

Task

Task是任务逻辑运行的地方,提供了一些方法来定义任务的逻辑行为,主要有run, output, requires.

Task通过类名及参数值做为标识符进行唯一区分。实际上,在同一个worker中,两个拥有相同类名及相同参数值的task不单单只是equal,而且实际上还是同一个实例。然而,如果参数在构建声明时指定了参数 significant=False ,对于Task的标识是不起影响的。对于多个Task,它们的类名相同,只是指定了 significant=False 的参数值才不同,而未指定 significant=False 的参数值是相同的,对于这些Task来说,它们拥有相同的标识符,即 hash(taskA) == hash(taskB) 是True的,但它们来自于不同的实例。

  • Task.requires

    requires方法用来指定依赖关系,除了可指定对其他Task的依赖,还可指定为对自身Task的依赖。requires返回值可为 dicts/lists/tuples 或其他类别的封装。

  • Task.output

    output方法返回一个或多个的Target对象,类似于requires方法,可返回适应于实际需要的对于Target的任何封装。实际上,建议只返回一个Target,因为如果返回多个,atomicity将会被丢失,除非Task能够确保多个Target能被原子性地创建。当然,如果原子性不是非常重要的时候,那么就可以放心地返回多个Target。

  • Task.run

    run方法包含实际真正执行的代码。注意到,Luigi将任何事情切分为两个阶段,首先它指出在tasks之间的依赖关系,然后它运行每一件事情。 input() 方法是一个内部帮助方法,用来替代在requires 中的对象的对应输出。

Parameter

在Python语言中,参数通常是在constructor时提供,但Luigi要求在类级别上声明所需的参数。通过这样子的要求,Luigi通过处理这些模板规范化的代码来为constructor提供所需参数。

Python是个无需指定类型的语言(Python是个强类型动态语言),对于参数无需指定类型。对于Luigi来说,可以简单地使用 luigi.Parameter 即可,之所以存在 DateParameter 的原因,是为了在命令行交互时,确保命令行参数的值可以转换为对应的类型。

Events and callbacks

Luigi内置了事件系统,允许注册callback到event中,并触发它们在所定义的tasks中,可挂接到一些预定义好的事件中,或者自定义事件中。每一个event被绑定到一个Task类中,将会被该Task类或其子类所触发。

Instance caching

对于实例,Luigi提供了Instance caching。对于同一个标识符的task,就算在代码中实例化创建了两次,但实际上只会创建一个实例,这个是有必要的,确保了task只会被执行一次。

Execution Model

Luigi拥有一个非常简单的运行模型,最重要的一个方面就是没有执行转移。当执行一个Luigi的工作流,worker调度所有的tasks,并在同样的这些进程内执行这些tasks。受益于这种模式,非常容易对所有执行任务进行debug。并且,开发过程也相当简单。在开发过程中,通常通过命令行来运行Luigi,而当你布署时,可以通过crontab或任何其他的调度器来调度。这种模式所带来的不好的地方,在于Luigi不能自由地进行扩展,不过Luigi认为扩展应该交给Task去实现;另外一个不好的地方就是Luigi需要依赖于外部的调度器来触发工作流,如crontab等。

Lugic Patterns

Code Reuse

Luigi的一个好处,是非常容易依赖于其他库中所定义的tasks。在执行路径上非常容易进行分叉,其中一项任务的输出可以成为很多其他任务的输入。

同时,Luigi任务的输出都将被无限期地保存。这点的好处就是当后面的任务失败时,在重跑失败任务时可以重用前面任务的输出,而不需要重跑前面的任务。不好的地方在于,将会有大量的中间结果保存在系统上,一个比较有用的建议就是把这些输出保存在一个特定的目录中,并进行定期地清除。

Triggering Many Tasks

一个常见的用例是每晚要运行一个Hadoop任务,但有时因为各种原因该任务会失败。一个有用的模式就是在最后建立一个虚拟任务,仅需声明依赖于最近多天之间的实际真正的任务。

Configuration

所有的配置均可由两个配置文件进行指定,一个是在当前工作目录下的 client.cfg ,另一个则为 /etc/luigi 。当前工作目录下的 client.cfg 高于 /etc/luigi 。

配置选项有:

  1. default-scheduler-host : 默认的scheduler
  2. error-email : 当crash时会收到eamil,但在命令行下运行时则没有。
  3. luigi-history : 如果设置了该选项,值为一文件名,将为记录一些东西(当前仅有job id)在mapreduce任务的输出目录下。

如果想在Python下运行Hadoop mapreduce任务,需要指定streaming jar的路径。

发送error告警邮件的配置示例

为了能够在发生error时能够发送邮件,得满足两个条件:一个是输出不能从终端直接输出,得重定向到其他地方;另外一个是在配置文件中进行了相关配置,配置示例如下。

  1. [core]
  2. error-email: receiver@xxx.com
  3. email-sender: sender@xxx.com
  4. smtp_ssl: False
  5. smtp_host: smtp.xxx.com
  6. smtp_port: 25
  7. smtp_login: sender@xxx.com
  8. smtp_password: sender_password

日志配置示例

如果不配置一个日志配置文件的话,会经常报一个提示:No handlers could be found for logger “luigi-interface” 。

其中一种配置使用方式如下: 在工程根目录下建立一个 conf 目录,在其下生成 logging.conf 文件,内容如下:

  1. [formatters]
  2. keys=simple,detail
  3. [handlers]
  4. keys=file_rotate
  5. [loggers]
  6. keys=root
  7. [formatter_simple]
  8. format=%(levelname)s: %(message)s
  9. [formatter_detail]
  10. format=%(asctime)s %(levelname)s [%(filename)s %(lineno)d]: %(message)s
  11. [handler_file_rotate]
  12. formatter=detail
  13. class=handlers.RotatingFileHandler
  14. args=(os.environ['MAIN_ROOT_DIR'] + "/logs/main.log", 'a', 1024 * 1024 * 500, 1)
  15. [logger_root]
  16. handlers=file_rotate
  17. #level=DEBUG
  18. level=INFO
  19. #level=WARNING

在入口执行的py文件中放入以下代码:

  1. #### MAIN_ROOT_DIR is used in loggin.conf for relative log file path
  2. MAIN_ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
  3. os.environ['MAIN_ROOT_DIR'] = MAIN_ROOT_DIR
  4. loggingConfigFile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conf/logging.conf')
  5. logging.config.fileConfig( loggingConfigFile, disable_existing_loggers = False)
  6. logger = logging.getLogger( __name__ )

现阶段不足

  • Luigi关注于批处理任务,所以对于实时流式处理及长时间一直运行的处理的帮助不大。
  • Luigi假设每一个task是工作中的相当大的一块数据处理。对于调度几千个任务是可以的,但将其扩展到数万个任务的调度是不太可行的。可通过把不同类别的任务拆分成组,同一组或多组的tasks跑在一个Luigi实例上,通过布多个Luigi实例来解决此问题。
  • Luigi对于任务调度及任务执行之间进行了严格地划分。动态的for循环及分支在Luigi中是不太容易实现的,举个例子,遍历一个数值计算任务直到它收敛是非常棘手的。

花边

Luigi是世界上最有名排第二的水管工的名字,中文译名为路易奇或路奇,有着“永远的老二”的外号。至于第一是谁,想必大家都猜到了吧 ^_^

代码示例

依赖任务的执行顺序

当在 Task.requires 下返回多个依赖的task时,在return语句中,越排在后面的反而先执行,代码示例如下:

  1. class Z(luigi.Task):
  2. def requires(self):
  3. return S(), P() ### 在执行时, 先执行 P 任务, 之后再执行 S 任务。所以当对依赖任务的执行顺序有要求时,请注意这里的排列顺序

以daemon方式启动luigid

启动server并以daemon方式运行的shell命令例子,注意将其中的一些路径换成实际路径,不要放在/tmp目录下,以免当luigid实例被kill掉时,所保存的log及state文件因重启系统而丢失了:

  1. luigid --pidfile /tmp/luigid.pid --logdir /tmp/luigi/log/ --state-path /tmp/luigi/state --background

向luigid提交task

在central scheduler模式下,提交task到server中的代码示例:

  1. sch = luigi.rpc.RemoteScheduler(host=..., port=...)
  2. w = luigi.worker.Worker(scheduler=sch)
  3. w.add(task)
  4. w.run()

将本地文件put到hdfs上

  1. import luigi, luigi.hdfs
  2. from datetime import datetime
  3. ## 本示例将本地的 /tmp/abc_%Y%m%d.txt 文件上传到 HDFS 上的 /test/abc_%Y%m%d 路径
  4. class BBF(luigi.ExternalTask):
  5. date = luigi.Parameter()
  6. def output(self):
  7. date = datetime.strptime(self.date, '%Y-%m-%d')
  8. return luigi.LocalTarget( date.strftime("/tmp/abc_%Y%m%d.txt") )
  9. class BF(luigi.Task):
  10. date = luigi.Parameter()
  11. def requires(self):
  12. return BBF(self.date)
  13. def output(self):
  14. date = datetime.strptime(self.date, '%Y-%m-%d')
  15. return luigi.hdfs.HdfsTarget(date.strftime("/test/abc_%Y%m%d.txt"))
  16. def run(self):
  17. hdfsClient = luigi.hdfs.HdfsClientApache1()
  18. hdfsClient.put( self.input().path, self.output().path )

执行hive语句

  1. import luigi, luigi.hdfs, luigi.hive
  2. from datetime import datetime
  3. ## 本示例是在hive上执行建立表分区的语句, 在 test 库的 abc 表上建立三个字段 pyearmonth, pday, pappid 对应的分区,并指向hdfs上的目录路径为 /test/abc 所对应的具体分区路径下
  4. class P(luigi.hive.HiveQueryTask):
  5. date = luigi.Parameter()
  6. appid = luigi.Parameter()
  7. def output(self):
  8. date = datetime.strptime(self.date, '%Y-%m-%d')
  9. return luigi.hive.HivePartitionTarget(table='abc', partition={'pyearmonth':date.strftime("%Y%m"), 'pday':date.strftime("%d"), 'pappid':self.appid}, database='test')
  10. def query(self):
  11. date = datetime.strptime(self.date, '%Y-%m-%d')
  12. return """
  13. USE test;
  14. ALTER TABLE abc ADD IF NOT EXISTS PARTITION ( pyearmonth='{ym}', pday='{d}', pappid='{appid}' ) LOCATION '/test/abc/pyearmonth={ym}/pday={d}/pappid={appid}';
  15. """.format(ym=date.strftime("%Y%m"), d=date.strftime("%d"), appid=self.appid)

扩展

有时luigi的一些特性未能很好地满足业务的特性,这时就需要自己对其进行扩展了,下面是一些扩展的例子。

  1. #### 可加jar包的HivePartitionTarget
  2. #!/usr/bin/env python
  3. #-*- coding:utf8 -*-
  4. import logging
  5. logger = logging.getLogger('luigi-interface')
  6. import re
  7. import luigi, luigi.hive
  8. class HivePartitionTarget(luigi.Target):
  9. """ exists returns true if the table's partition exists """
  10. def __init__(self, database, table, partition, jar=None):
  11. self.database = database
  12. self.table = table
  13. if not isinstance(partition, dict):
  14. raise Exception("hiveUtil.HiveTablePartitionTarget init parameter partition not proper, it's type should be dict")
  15. self.partition = ','.join(["{0}='{1}'".format(k, v) for (k, v) in partition.items()])
  16. if not isinstance(jar, (type(None), str, list, tuple, set)):
  17. raise Exception("hiveUtil.HiveTablePartitionTarget init parameter jar not proper, it's type should be None, str, list, tuple or set")
  18. jarStr = ""
  19. if jar is not None:
  20. if isinstance(jar, str):
  21. jarStr = "ADD JAR {jarPath};".format(jarPath=jar)
  22. elif isinstance(jar, (list, tuple, set)):
  23. for j in jar:
  24. jarStr = "{oldStr} ADD JAR {jarPath};".format(
  25. oldStr = jarStr,
  26. jarPath = j,
  27. )
  28. self.jar = jarStr
  29. def exists(self):
  30. try:
  31. logger.debug("Checking Hive table '{d}.{t}' for partition {p}".format(d=self.database, t=self.table, p=str(self.partition)))
  32. stdout = luigi.hive.run_hive_cmd("""
  33. USE {db};
  34. {jarStr};
  35. SHOW PARTITIONS {table} PARTITION ({partition})
  36. """.format(
  37. db=self.database,
  38. jarStr=self.jar,
  39. table=self.table,
  40. partition=self.partition,
  41. ),
  42. True)
  43. if stdout:
  44. return True
  45. else:
  46. return False
  47. except Exception, e:
  48. raise e
  49. """ Uses `hive` invocations to find information """
  50. def table_location(self):
  51. cmd = "USE {db}; {jarStr}; DESCRIBE FORMATTED {table} PARTITION ({partition})".format(
  52. db=self.database,
  53. jarStr=self.jar,
  54. table=self.table,
  55. partition=self.partition,
  56. )
  57. stdout = luigi.hive.run_hive_cmd(cmd, True)
  58. for line in stdout.split("\n"):
  59. if "Location:" in line:
  60. return line.split("\t")[1]
  61. @property
  62. def path(self):
  63. """Returns the path for this HiveTablePartitionTarget's data"""
  64. location = self.table_location()
  65. if not location:
  66. raise Exception("Couldn't find location for table: {0}".format(str(self)))
  67. if location.startswith( 'hdfs' ):
  68. tp = re.compile( r'(hdfs://[^/]+)(.*)' )
  69. tg = tp.match( location )
  70. if tg:
  71. location = tg.groups()[1]
  72. return location
  73. def open(self, mode):
  74. return NotImplementedError("open() is not supported for HivePartitionTarget")

使用示例,假设上面的代码存为 hiveUtil.py 文件,需要添加的jar包放在当前目录下的 lib目录下

  1. import hiveUtil
  2. import luigi
  3. class T(luigi.Task):
  4. ...
  5. def output(self):
  6. ...
  7. return hiveUtil.HivePartitionTarget(table='table_name', partition={'pyearmonth':'201406', 'pday':'06'}, database='db_name', jar={'lib/a.jar', 'lib/b.jar'})

安装

前置环境安装

  1. sudo apt-get install build-essential python-dev python-daemon python-setuptools libcurl4-gnutls-dev librtmp-dev

依赖包安装

以下包的安装,后面括号内空为安装步骤的简单记录:

  1. pycares (./build_inplace; python setup.py build; sudo python setup.py install)
  2. pycurl (python setup.py build; sudo python setup.py install)
  3. unittest2 (python setup.py build; sudo python setup.py install)
  4. futures (python setup.py build; sudo python setup.py install)
  5. Monotime (python setup.py build; sudo python setup.py install)
  6. Twisted (sudo python setup.py install)
  7. backports.ssl_match_hostname (python setup.py build; sudo python setup.py install)
  8. tornado (python setup.py build; sudo python setup.py install)
  9. mechanize (python setup.py build; sudo python setup.py install)
  10. simplejson (这个可提高 luigi 的json性能,在luigi源码中有时在使用json时要是有simplejson则使用simplejson模块,否则使用自带的json库,但自带的json库性能比较差)
  11. luigi ( 从 github上下载zip包,解压开之后, sudo python setu.py install )
  12. Connector/Python (sudo python setup.py install) (这个是mysql官方的python驱动,当需要使用 luigi.contrib.mysqldb 时,需要安装此模块)
  13. MySQLdb (sudo python setup.py install ) (在安装它之前,需要先安装 sudo apt-get install libmysqlclient-dev python-setuptools)(此为选装组件,做为python的mysql驱动,可支持load data local infile,在打开连接时需要加上local_infile参数,MySQLdb.connect(..., local_infile=1) )

 类似资料: