当前位置: 首页 > 知识库问答 >
问题:

如何使用Azure数据工厂将CosmosDb文档复制到Blob存储(每个文档位于单个json文件中)

石超
2023-03-14

我正在尝试使用Azure数据工厂(v2)备份我的Cosmos Db存储。一般来说,它在做它的工作,但我想让Cosmos集合中的每个文档对应blobs存储中的新json文件。

使用下一个复制参数,我可以将集合中的所有文档复制到azure blob存储中的1个文件中:

{
"name": "ForEach_mih",
"type": "ForEach",
"typeProperties": {
    "items": {
        "value": "@pipeline().parameters.cw_items",
        "type": "Expression"
    },
    "activities": [
        {
            "name": "Copy_mih",
            "type": "Copy",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "userProperties": [
                {
                    "name": "Source",
                    "value": "@{item().source.collectionName}"
                },
                {
                    "name": "Destination",
                    "value": "cosmos-backup-v2/@{item().destination.fileName}"
                }
            ],
            "typeProperties": {
                "source": {
                    "type": "DocumentDbCollectionSource",
                    "nestingSeparator": "."
                },
                "sink": {
                    "type": "BlobSink"
                },
                "enableStaging": false,
                "enableSkipIncompatibleRow": true,
                "redirectIncompatibleRowSettings": {
                    "linkedServiceName": {
                        "referenceName": "Clear_Test_BlobStorage",
                        "type": "LinkedServiceReference"
                    },
                    "path": "cosmos-backup-logs"
                },
                "cloudDataMovementUnits": 0
            },
            "inputs": [
                {
                    "referenceName": "SourceDataset_mih",
                    "type": "DatasetReference",
                    "parameters": {
                        "cw_collectionName": "@item().source.collectionName"
                    }
                }
            ],
            "outputs": [
                {
                    "referenceName": "DestinationDataset_mih",
                    "type": "DatasetReference",
                    "parameters": {
                        "cw_fileName": "@item().destination.fileName"
                    }
                }
            ]
        }
    ]
}
}

如何将每个cosmos文档复制到单独的文件中,并将其命名为{PartitionId}-{docId}?

断续器

源代码集:

{
"name": "ClustersData",
"properties": {
    "linkedServiceName": {
        "referenceName": "Clear_Test_CosmosDb",
        "type": "LinkedServiceReference"
    },
    "type": "DocumentDbCollection",
    "typeProperties": {
        "collectionName": "directory-clusters"
    }
}
}

目标集代码:

{
"name": "OutputClusters",
"properties": {
    "linkedServiceName": {
        "referenceName": "Clear_Test_BlobStorage",
        "type": "LinkedServiceReference"
    },
    "type": "AzureBlob",
    "typeProperties": {
        "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects"
        },
        "fileName": "",
        "folderPath": "cosmos-backup-logs"
    }
}
}

管道代码:

{
"name": "copy-clsts",
"properties": {
    "activities": [
        {
            "name": "LookupClst",
            "type": "Lookup",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "typeProperties": {
                "source": {
                    "type": "DocumentDbCollectionSource",
                    "nestingSeparator": "."
                },
                "dataset": {
                    "referenceName": "ClustersData",
                    "type": "DatasetReference"
                },
                "firstRowOnly": false
            }
        },
        {
            "name": "ForEachClst",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "LookupClst",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "typeProperties": {
                "items": {
                    "value": "@activity('LookupClst').output.value",
                    "type": "Expression"
                },
                "batchCount": 8,
                "activities": [
                    {
                        "name": "CpyClst",
                        "type": "Copy",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "DocumentDbCollectionSource",
                                "query": "select @{item()}",
                                "nestingSeparator": "."
                            },
                            "sink": {
                                "type": "BlobSink"
                            },
                            "enableStaging": false,
                            "enableSkipIncompatibleRow": true,
                            "cloudDataMovementUnits": 0
                        },
                        "inputs": [
                            {
                                "referenceName": "ClustersData",
                                "type": "DatasetReference"
                            }
                        ],
                        "outputs": [
                            {
                                "referenceName": "OutputClusters",
                                "type": "DatasetReference"
                            }
                        ]
                    }
                ]
            }
        }
    ]
}
}

输入集合中doc的示例(格式相同):

{
   "$type": "Entities.ADCluster",
    "DisplayName": "TESTNetBIOS",
    "OrgId": "9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
    "ClusterId": "ab2a242d-f1a5-62ed-b420-31b52e958586",
    "AllowLdapLifeCycleSynchronization": true,
    "DirectoryServers": [
        {
            "$type": "Entities.DirectoryServer",
            "AddressId": "e6a8edbb-ad56-4135-94af-fab50b774256",
            "Port": 389,
            "Host": "192.168.342.234"
        }
    ],
    "DomainNames": [
        "TESTNetBIOS"
    ],
    "BaseDn": null,
    "UseSsl": false,
    "RepositoryType": 1,
    "DirectoryCustomizations": null,
    "_etag": "\"140046f2-0000-0000-0000-5ac63a180000\"",
    "LastUpdateTime": "2018-04-05T15:00:40.243Z",
    "id": "ab2a242d-f1a5-62ed-b420-31b52e958586",
    "PartitionKey": "directory-clusters-9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
    "_rid": "kpvxLAs6gkmsCQAAAAAAAA==",
    "_self": "dbs/kvpxAA==/colls/kpvxLAs6gkk=/docs/kvpxALs6kgmsCQAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1522940440
}

共有3个答案

夏雅志
2023-03-14

查找和复制活动源数据集引用相同的 cosmosdb 数据集。

如果您想复制您的5个集合,您可以将此管道放入执行活动中。执行活动的主管道有一个foreach活动。

阳博赡
2023-03-14

你是否考虑过用Azure函数以不同的方式实现这一点?ADF设计用于将大量数据从一个地方移动到另一个地方,并且每个集合只生成一个文件。

您可以考虑在集合中添加/更新文档时触发Azure函数,并让Azure函数将文档输出到blob存储。这应该可以很好地扩展,并且相对容易实现。

储修谨
2023-03-14

由于您的cosmosdb具有数组并且ADF不支持cosmos db的序列化数组,因此这是我可以提供的解决方法。

首先,将所有文档导出到json文件,并按原样导出json(到blob或adls或文件系统,任何文件存储)。我想你已经知道如何做到这一点。这样,每个集合都将有一个json文件。

其次,处理每个json文件,将文件中的每一行精确到一个文件中。

我只为步骤2提供管道。您可以使用执行管道活动来链接步骤1和步骤2。您甚至可以使用foreach活动处理步骤2中的所有集合。

管道json

{
"name": "pipeline27",
"properties": {
    "activities": [
        {
            "name": "Lookup1",
            "type": "Lookup",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "typeProperties": {
                "source": {
                    "type": "BlobSource",
                    "recursive": true
                },
                "dataset": {
                    "referenceName": "AzureBlob7",
                    "type": "DatasetReference"
                },
                "firstRowOnly": false
            }
        },
        {
            "name": "ForEach1",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "Lookup1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "typeProperties": {
                "items": {
                    "value": "@activity('Lookup1').output.value",
                    "type": "Expression"
                },
                "activities": [
                    {
                        "name": "Copy1",
                        "type": "Copy",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "DocumentDbCollectionSource",
                                "query": {
                                    "value": "select @{item()}",
                                    "type": "Expression"
                                },
                                "nestingSeparator": "."
                            },
                            "sink": {
                                "type": "BlobSink"
                            },
                            "enableStaging": false,
                            "cloudDataMovementUnits": 0
                        },
                        "inputs": [
                            {
                                "referenceName": "DocumentDbCollection1",
                                "type": "DatasetReference"
                            }
                        ],
                        "outputs": [
                            {
                                "referenceName": "AzureBlob6",
                                "type": "DatasetReference",
                                "parameters": {
                                    "id": {
                                        "value": "@item().id",
                                        "type": "Expression"
                                    },
                                    "PartitionKey": {
                                        "value": "@item().PartitionKey",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                ]
            }
        }
    ]
},
"type": "Microsoft.DataFactory/factories/pipelines"

}

用于查找的数据集json

   {
"name": "AzureBlob7",
"properties": {
    "linkedServiceName": {
        "referenceName": "bloblinkedservice",
        "type": "LinkedServiceReference"
    },
    "type": "AzureBlob",
    "typeProperties": {
        "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects"
        },
        "fileName": "cosmos.json",
        "folderPath": "aaa"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

复制的源数据集。实际上,这个数据集没有用。只想用它来托管查询(选择@{项目()}

{
"name": "DocumentDbCollection1",
"properties": {
    "linkedServiceName": {
        "referenceName": "CosmosDB-r8c",
        "type": "LinkedServiceReference"
    },
    "type": "DocumentDbCollection",
    "typeProperties": {
        "collectionName": "test"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

目标数据集。通过两个参数,它还处理了您的文件名请求。

{
"name": "AzureBlob6",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureStorage-eastus",
        "type": "LinkedServiceReference"
    },
    "parameters": {
        "id": {
            "type": "String"
        },
        "PartitionKey": {
            "type": "String"
        }
    },
    "type": "AzureBlob",
    "typeProperties": {
        "format": {
            "type": "JsonFormat",
            "filePattern": "setOfObjects"
        },
        "fileName": {
            "value": "@{dataset().PartitionKey}-@{dataset().id}.json",
            "type": "Expression"
        },
        "folderPath": "aaacosmos"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

另请注意查找活动的限制:查找支持以下数据源。查找活动可以返回的最大行数为 5000,最大为 2MB。目前,超时前查找活动的最大持续时间为一小时。

 类似资料:
  • 我想通过运行在Azure VM上的FTP服务器与用户共享Azure Blob存储中的文件。 据我所知,您不能在VM上挂载Blob存储,但可以使用“网络使用”挂载Azure文件共享。 Blob存储上的文件将以增量方式上载,因此理想情况下,我希望在上载时将其复制到Azure文件,Azure功能似乎是理想的方式,因为它们很容易为我设置和处理Blob存储上的触发器。 我如何使用Azure功能将文件从Blo

  • 我不熟悉CosmosDb和azure blob存储,因为我需要引用一个从CosmosDb中的文档上传到azure blob存储的文件,并在附件部分使用它来保存元数据。 我知道json元数据结构应该是这样的: 但是如何在将文件上传到azure blob存储时获取对媒体属性的引用,更准确地说,如何将文件从c#上传到azure blob存储并通过将url设置为媒体属性来引用它。

  • 我正在尝试将我使用创建的文档上传到Azure blob存储。在本地,我可以保存文件,并使用方法将文件从本地磁盘上传到Azure。但是,我想使用Azure Function将文件直接上传到Azure存储,关于如何从方法上传到Azure存储,有什么想法吗?保存方法需要path_or_stream

  • 我的应用程序见解中有一个“持续导出”过程,该过程基于我的新见解创建新文件。 除此之外,我有一个过程,使用Azure数据工厂,用Blob存储数据加载一个SQL表。 问题是:我不能从ADF读取数据,只能从Blob存储中读取新文件,而且我总是在处理相同的数据。此时,我忽略了SQL存储过程中加载过程后的重复数据,但我想通过只从Blob存储中读取新数据来提高这个过程的效率,我可以从ADF读取新数据吗?有人能

  • 如何将 avro 文件从 Blob 存储加载到 Azure 数据工厂 移动数据流?我正在尝试加载,但无法导入架构和预览。我在 Blob 中的 avro 文件是事件中心捕获函数的结果。我必须使用 Azure 数据工厂的移动数据流将数据从 Azure blob 移动到 Azure sql db。

  • 问题内容: 将节点从一个文档复制到另一个文档时遇到问题。我已经使用了Node中的acceptNode和importNode方法,但是它们不起作用。我也尝试了appendChild,但是抛出了异常。我正在使用Xerces。那里没有实现吗?还有另一种方法吗? 问题答案: 问题在于,节点的上下文包含许多内部状态,其中包括其父项和拥有它们的文档。无论是也将目标文档,这就是为什么你的代码是没有的新节点的任何