Storm可支持多种语言,其中就有python .
首先需要创建一个类,
public static class BasieCalculateBolt extends ShellBolt implementsIRichBolt {publicBasieCalculateBolt() {super("python", "bolt_base_calculate.py");
}
@Overridepublic voiddeclareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Overridepublic MapgetComponentConfiguration() {return null;
}
}
引用的bolt_base_calculate.py放置的目录必须为本项目的resources目录,本项目的py文件放置于mutilang/resources目录下,则要在maven的pom.xml文件中将其设置为resource目录。
src/jvm
test/jvm
${basedir}/multilang
maven-assembly-plugin
一个最简单的Python bolt如下所示:
importstormclassSplitSentenceBolt(storm.BasicBolt):defprocess(self, tup):
words= tup.values[0].split(" ")for word inwords:
storm.emit([word])
SplitSentenceBolt().run()
在resources目录下还需放置在官网上下载的最新storm.py文件,https://github.com/apache/storm/blob/master/bin/storm.py。
python的bolt中不可有print语句,因为storm中Python bolt和其他bolt之间数据的传递的便是通过监控console输出的数据。但是我们在Python中需要打印一些log来查看程序的运行,此时可使用log,即创建一个log.py
importloggingimportlogging.configimportos
logging.config.fileConfig('logging.conf')#create logger 下面是你工程的名称
logger = logging.getLogger('calculateEngine')
logging.conf的配置可设置为
[loggers]
keys=root,calculateEngine
[handlers]
keys=fileHandler,consoleHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=DEBUG
handlers=consoleHandler
[logger_calculateEngine]
level=INFO
handlers=fileHandler
qualname=calculateEngine
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=WARN
formatter=simpleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=FileHandler
level=DEBUG
maxBytes=10485760
backupCount=20
encoding=utf8
formatter=simpleFormatter
args=(os.path.join(os.path.dirname(__file__),'asien_calculate.log'),'a')
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
在python中的使用,只需from log import logger ,log.info("")即可