我一直在通过PuTTy登录到SSH来运行Hadoop MapReduce作业,这要求我在PuTTy中输入主机名/IP地址、登录名和密码,以获得SSH命令行窗口。进入SSH控制台窗口后,我将提供适当的MR命令,例如:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar-文件 /nfs_home/appers/user1/mapper.py-文件 /nfs_home/appers/user1/reducer.py-mapper'/usr/lib/python_2.7.3/bin/pythonmapper.py'-减速机'/usr/lib/python_2.7.3/bin/pythonreducer.py'-输入 /ccexp/data/test_xml/0901282-510179094535002-oozie-oozi-W/提取//. xml-输出 /user/ccexptest/output/user1/MRoutput
我想做的是使用Python来改变这个笨拙的过程,这样我就可以从Python脚本中启动MapReduce作业,而不必通过PuTTy登录SSH。
这可以做到吗?如果可以,有人能告诉我怎么做吗?
我用下面的脚本解决了这个问题:
import paramiko
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
# ----------------------------------------------------
def buildMRcommand(product_name):
space = " "
mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
'-files', hdfs_home+xml_lookup_file,
'-file', hdfs_home+mapper,
'-file', hdfs_home+reducer,
'-mapper', "'"+python_path, mapper, product_name+"'",
'-file', hdfs_home+helper_script,
'-reducer', "'"+python_path, reducer+"'",
'-input', input_loc,
'-output', output_loc+output_ref]
MR_command = space.join(mr_command_list)
print MR_command
return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ''
# ----------------------------------------------------
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, username=user, password=pw)
# Build Commands
list_dir = "ls "+hdfs_home+" -l"
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+"test_011216_0.txt"
# Run Command
stdin, stdout, stderr = client.exec_command(list_dir)
##stdin, stdout, stderr = client.exec_command(buildMRcommand(product_name))
##stdin, stdout, stderr = client.exec_command(getmerge)
print "Executing command..."
writer = open(output_log, 'w')
for l in unbuffered_lines(stderr):
e = '[stderr] ' + l
print '[stderr] ' + l.strip('\n')
writer.write(e)
for line in stdout:
r = '[stdout]' + line
print '[stdout]' + line.strip('\n')
writer.write(r)
client.close()
writer.close()
我正在使用spring批处理和spring批处理管理我们的日常批处理工作。我使用Tomcat7通过spring批处理管理UI启动作业对其进行了测试。 我的工作有跨不同服务器的远程分区步骤,我使用rabbitmq作为中间件,spring批量集成用于远程分区。 在测试过程中,我在所有服务器上部署应用程序,在所有服务器上启动tomcat,以启动所有监听器(入站网关并发线程),启动所有bean。 现在我想
问题内容: 我正在尝试编写一个Jenkins作业(例如CopyJob),该作业将复制另一个作业(在此作业中使用Multijob插件),并且还将其所有下游作业复制到新作业中。想法是要有一个Multijob作为模板,以便可以将其复制到新的Multijobs中(例如,用于特定的分支或功能)。 看到: 当手动触发“ CopyJob”时,它将使用新的SubJobs创建一个新的MultiJob: 到目前为止,
我在Unix系统(OEL)上安装了一个Jenkins master。我配置了两个通过SSH启动的Unix从机(也是OEL)。从属程序永远不会启动,主程序的GUI中也不会报告错误(根本没有输出,只是一个旋转的球): 我在文件中看到的唯一警告是: 主服务器上的JDK版本是: 从服务器上的Java版本: 看起来Jenkins没有启动任何SSH连接。 将Jenkins升级到1.607版本后,在尝试启动从机
我正在尝试运行一个python3脚本,每天在特定的时间检查电子邮件的特定条件。 我可以看到crontab调用了这些命令,但脚本没有给出我需要的结果,即似乎没有运行。我可以在syslog中看到cron的执行: 8月3日16:25:01 raspberrypi/USR/SBIN/CRON[4597]:(pi)CMD(cd/home/pi/pythonscripts) 8月3日16:25:01 rasp
我正在测试Spring Batch重新启动功能,其中我期望在执行之间手动终止的作业在提供JOB_EXECUTION_ID时重新启动。 我更新了BATCH_JOB_EXECUTION和BATCH_STEP_EXECUTION表,使其状态和退出代码为“失败”。但是当我试图通过提供EXECUTION_ID来重新启动作业时,我得到了如下的异常 有人能告诉我我在这里犯了什么错误吗?
我正在使用quartz调度器来调度一个Spring批处理作业。应用程序启动时没有任何异常,但它从不激发任何作业。 它运行得很成功,但经过更多的开发后,它停止了工作。我无法弄清楚我到底改变了什么配置导致了这一点。 有谁可以建议检查点在使用“JobRepositoryFactoryBean”,如果我没有或问题在其他地方。