当前位置: 首页 > 工具软件 > Luigi > 使用案例 >

luigi 模板

法兴德
2023-12-01

luigi doc

import os,sys
import luigi
import luigi.contrib.hdfs
from datetime import datetime, timedelta

class DummyTarget(luigi.Target):
    def __init__(self, exist):
        self.exist = exist

    def exists(self):
        return self.exist

class CrawlDataInput(luigi.ExternalTask):
    input_file=luigi.Parameter()
    def output(self):
        target = luigi.contrib.hdfs.HdfsTarget(self.input_file)
        if not  target.exists():
            return DummyTarget(False)
        return target

class CrawlParseBaseData(luigi.Task):
    version = luigi.Parameter()
    def requires(self):
        return CrawlDataInput(done_base_tag)
    def run(self):

    def output(self):
        target=luigi.contrib.hdfs.HdfsTarget(done_base_tag)
        return target 


class CrawlDataParserTask(luigi.Task):
    version = luigi.Parameter()
    def requires(self):
        return CrawlParseBaseData(version=self.version)
    def run(self):

    def output(self):
        target=luigi.contrib.hdfs.HdfsTarget(done_base_tag)
        return target 

if __name__ == '__main__':
       date_str = (datetime.now() - timedelta(2)).strftime('%Y%m%d')
       luigi.build([CrawlDataParserTask(version=date_str)])

 类似资料: