当前位置: 首页 > 工具软件 > Luigi > 使用案例 >

luigi 初探

尉迟跃
2023-12-01
import luigi


class InputText(luigi.ExternalTask):
    ''' This class represents something that was created elsewhere by an external process,
    so all we want to do is to implement the output method.
    '''
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget(self.date.strftime('/var/tmp/text/%Y-%m-%d.txt'))


class WordCount(luigi.Task):
    date_interval = luigi.DateIntervalParameter()

    def requires(self):
        return [InputText(date) for date in self.date_interval.dates()]

    def output(self):
        return luigi.LocalTarget('/var/tmp/text-count/%s' % self.date_interval)

    def run(self):
        count = {}
        for file in self.input():  # The input() method is a wrapper around requires() that returns Target objects
            for line in file.open(
                    'r'):  # Target objects are a file system/format abstraction and this will return a file stream object
                for word in line.strip().split():
                    count[word] = count.get(word, 0) + 1

        # output data
        f = self.output().open('w')
        for word, count in count.iteritems():
            f.write("%s\t%d\n" % (word, count))
        f.close()  # Note that this is essential because file system operations are atomic


if __name__ == '__main__':
    luigi.run(main_task_cls=WordCount) 
 类似资料: