1、mac 上安装luigi
pip install luigi
pip install boto3 (luigi依赖 boto3)
2、基本概念
class Streams(luigi.Task): """ Faked version right now, just generates bogus data. """ date = luigi.DateParameter() def run(self): """ Generates bogus data and writes it into the :py:meth:`~.Streams.output` target. """ with self.output().open('w') as output: for _ in range(1000): output.write('{} {} {}\n'.format( random.randint(0, 999), random.randint(0, 999), random.randint(0, 999))) def output(self): """ Returns the target output for this task. In this case, a successful execution of this task will create a file in the local file system. :return: the target output for this task. :rtype: object (:py:class:`luigi.target.Target`) """ return luigi.LocalTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv')) class AggregateArtists(luigi.Task): """ This task runs over the target data returned by :py:meth:`~/.Streams.output` and writes the result into its :py:meth:`~.AggregateArtists.output` target (local file). """ date_interval = luigi.DateIntervalParameter() def output(self): """ Returns the target output for this task. In this case, a successful execution of this task will create a file on the local filesystem. :return: the target output for this task. :rtype: object (:py:class:`luigi.target.Target`) """ return luigi.LocalTarget("data/artist_streams_{}.tsv".format(self.date_interval)) def requires(self): """ This task's dependencies: * :py:class:`~.Streams` :return: list of object (:py:class:`luigi.task.Task`) """ return [Streams(date) for date in self.date_interval] def run(self): artist_count = defaultdict(int) for t in self.input(): with t.open('r') as in_file: for line in in_file: _, artist, track = line.strip().split() artist_count[artist] += 1 with self.output().open('w') as out_file: for artist, count in six.iteritems(artist_count): out_file.write('{}\t{}\n'.format(artist, count))
run()是这个task要执行的内容
requires()是这个task所依赖的任务,这里依赖一系列的Stream
output()是这个task的输出
input()这个是所依赖的task产生的输出
二、使用central planner
先用
luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE>
打开liguid server
然后运行任务,比如:
luigi --module top_artists2 Top10Artists --date-interval 2012-06
注意,要去掉 --local-scheduler
然后可以用 localhost:8082来访问现在的任务
如果A -> B,A依赖B,那么B的output可以在A里面直接用input()来使用,如果B的output是若干文件的话,那么在A中的input()也是若干文件,可以用for循环来读取