当前位置: 首页 > 知识库问答 >
问题:

在AWS EMR 5.0上运行的boto3中为MR作业添加流步骤

杜弘光
2023-03-14

我正在尝试将我用python编写的几个MR作业从AWS EMR 2.4迁移到AWS EMR 5.0。到目前为止,我一直在使用BOTO2.4,但它不支持EMR5.0,所以我尝试转向boto3。前面,在使用boto 2.4时,我使用了StreamingStep模块来指定输入位置和输出位置,以及映射器和reducer源文件的位置。使用这个模块,我实际上不需要创建或上传任何jar来运行我的作业。但是,我在boto3文档中找不到该模块的等效项。如何在boto3中向MR作业中添加流式处理步骤,以便不必上传jar文件来运行它?

共有1个答案

司马钱明
2023-03-14

不幸的是,boto3和EMR应用编程接口的文档很少。最起码,单词计数示例如下所示:

import boto3

emr = boto3.client('emr')

resp = emr.run_job_flow(
    Name='myjob',
    ReleaseLabel='emr-5.0.0',
    Instances={
        'InstanceGroups': [
            {'Name': 'master',
             'InstanceRole': 'MASTER',
             'InstanceType': 'c1.medium',
             'InstanceCount': 1,
             'Configurations': [
                 {'Classification': 'yarn-site',
                  'Properties': {'yarn.nodemanager.vmem-check-enabled': 'false'}}]},
            {'Name': 'core',
             'InstanceRole': 'CORE',
             'InstanceType': 'c1.medium',
             'InstanceCount': 1,
             'Configurations': [
                 {'Classification': 'yarn-site',
                  'Properties': {'yarn.nodemanager.vmem-check-enabled': 'false'}}]},
        ]},
    Steps=[
        {'Name': 'My word count example',
         'HadoopJarStep': {
             'Jar': 'command-runner.jar',
             'Args': [
                 'hadoop-streaming',
                 '-files', 's3://mybucket/wordSplitter.py#wordSplitter.py',
                 '-mapper', 'python2.7 wordSplitter.py',
                 '-input', 's3://mybucket/input/',
                 '-output', 's3://mybucket/output/',
                 '-reducer', 'aggregate']}
         }
    ],
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
)

我不记得需要用boto做这件事,但是我在没有禁用vmem-check启用的情况下正确运行简单的流作业时遇到了问题。

此外,如果您的脚本位于S3上的某个地方,请使用-file下载它(将#filename附加到参数中,使下载的文件在群集中作为filename可用)。

 类似资料:
  • 我正在kubernetes上试用最新版本的Flink1.5的flink工作。 我的问题是如何在上面的flink集群上运行一个示例应用程序。flink示例项目提供了如何使用flink应用程序构建docker映像并将该应用程序提交给flink的信息。我遵循了这个例子,只是把flink的版本改成了最新版本。我发现应用程序(example-app)提交成功,并且在kubernetes的pod中显示,但是f

  • 我知道我可以用云函数和PubSub通知来完成每个写入的文件,但我更喜欢只在整个文件夹完成时这样做一次。 谢了!

  • 我刚刚开始为一个项目设置一个Github操作工作流。我试图在一个容器内运行工作流步骤,并使用以下工作流定义: 但工作流无法声明容器立即停止到期。 我尝试了很多图片,包括官方文档中描述的“alpine:3.8”图片,但容器停止了。 根据GitHub操作的工作流语法,在容器部分中:“一个容器,用于运行作业中尚未指定容器的任何步骤。”我的假设是容器将被启动,步骤将在Docker容器内运行。

  • 问题内容: 我想创建一个Jenkins作业来启动其他Jenkins作业。那将非常容易,因为Jenkins模板项目插件允许我们创建一个类型为“使用来自另一个项目的构建器”的构建步骤。但是,使我的情况更难的是,我必须在其他计算机上开始Jenkins的工作。有什么标准方法可以做到吗? 问题答案: 万一您只想触发Job的新版本,您有多种方法可以完成它 您可以使用远程访问API并触发请求以从源Job构建目标

  • 首先,我不熟悉气流。我试图做的是使用气流运行数据流作业。搜索了很多链接,发现我们需要创建一个jar文件,然后使用脚本运行它。有人知道这个罐子应该放在哪里吗,意思是放在谷歌云存储桶上还是本地路径上。我们可以通过气流直接运行模板吗

  • 目前,我使用jobParameters获取FlatFileItemReader和FlatFileItemWriter的文件名。测试我的批处理是可以的,但是我的目标是读取某个目录中的文件(这个目录中只有这个文件),并且文件名可能会更改。输出文件名应该依赖于输入文件名。 因此,我考虑在我的工作中添加一个新的步骤,这个步骤将通过搜索好的目录并向其中查找文件来设置输出和输入文件名。我从Spring Doc