当前位置: 首页 > 知识库问答 >
问题:

当一个DynamoDB表和另一个DynamoDB表都有按需容量时,如何使用数据管道将它们的数据复制到另一个DynamoDB表

皇甫飞跃
2023-03-14

我曾经使用pipeline.json.将数据从一个DynamoDB复制到另一个DynamoDB。当源表具有预配容量时,它就可以工作,如果目标设置为预配/按需设置也没关系。我希望我的两个表设置为按需容量。但当我使用相同的模板它不工作。我们有没有办法做到这一点,或者它还在开发中?

以下是我的原始功能脚本:

{
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule",
            "occurrences": "1"
        },
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "schedule": {
                "ref": "DailySchedule"
            },
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "dependsOn": {
               "ref": "TableBackupActivity"
            },
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-emr-#{myDDBDestinationRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-emr-#{myDDBSourceRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForBackup"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

下面是当source DynamoDB表设置为On Demand capacity时数据管道执行的错误消息:

at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.run(DynamoDbExport.java:79)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.main(DynamoDbExport.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

共有1个答案

陈俊誉
2023-03-14

以下JSON文件用于上传(DynamoDB到S3)-

{
    "objects": [
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/data"
        },
        {
            "subnetId": "subnet-id",
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "masterInstanceType": "m5.xlarge",
            "coreInstanceType": "m5.xlarge",
            "coreInstanceCount": "1",
            "releaseLabel": "emr-5.23.0",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-dpl-#{myDDBSourceRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

和以下工作的下载(S3到DynamoDB)-

{
    "objects": [
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/data"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "subnetId": "subnet-id",
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "releaseLabel": "emr-5.23.0",
            "masterInstanceType": "m5.xlarge",
            "coreInstanceType": "m5.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-dpl-#{myDDBDestinationRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForLoad"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

此外,两个管道定义中的子网ID字段都是完全可选的,但设置它们总是好的。

 类似资料:
  • 在SQL Server中,如何将数据从一个表复制/追加到具有相同架构的另一个表中? 编辑: 假设有一个问题 它使用与表2中相同的模式和数据创建表1。 有没有这样的短查询只将整个数据复制到一个已经存在的表中?

  • 问题内容: 我需要将表从一个数据库复制到另一个数据库。这将是一个cronjob。哪一种是最好的方法?PHP脚本或Shell脚本。PHP的问题是,两个数据库都有不同的用户名和密码,所以我不能这样做。 我应该只连接第一个DB以获得所有记录,然后使用WHILE循环将所有记录插入新数据库,还是有更好的方法? 我更喜欢用shell脚本代替PHP脚本来执行此操作。 谢谢 问题答案: 我把它丢了。比任何基于PH

  • 问题内容: 我想在MySQL中将数据从一个表复制到另一个表。 表1(现有表): 表2(新表) 我想将一些数据字段从表1复制到表2。 可以使用MySQL查询完成吗? 问题答案: 这将做您想要的: 如果要包括table1中的所有行。否则,如果只想添加table1的子集,则可以在末尾添加WHERE语句。 我希望这有帮助。

  • 问题内容: 我有两个数据库,一个叫做,一个叫做。由于我想练习插入,更新内容,因此我想将某些表从复制到。 我要复制的表称为: 我试图做的是(使用SSMS)右键单击表格,但其中没有“复制”! 问题答案: 假设您有两个数据库,例如A和B: 如果目标表不存在,将创建以下脚本(我不建议这样): INTO COPY_TABLE_HERE FROM A.dbo.table_from_A table_A 如果目标

  • 问题内容: 我正在尝试将所有数据从一个列族(表)移至另一列族。由于两个表都有不同的描述,因此我将不得不从表1中提取所有数据并为表2创建一个新对象,然后进行批量aync插入。我的表1有数百万条记录,因此我无法直接在我的数据结构中获取所有数据并进行计算。我正在寻找使用Spring Data Cassandra和Java轻松实现此目的的解决方案。 我最初计划首先将所有数据移动到临时表,然后创建一些组合键

  • 问题内容: 我有2个不同的表,但各列的命名略有不同。我想从一个表中获取信息,然后将其放入另一个表中。仅当表1中的“信息字段”不为null时,才需要将表1中的信息放入表2中。表2在创建任何东西时都有一个唯一的ID,因此插入的任何东西都需要获得下一个可用的ID号。 表格1 表2 问题答案: 这应该工作。您无需担心Table2中的identify字段。