当前位置: 首页 > 面试题库 >

使用Python或Java从本地将数据上传到Azure ADLS Gen2

王叶五
2023-03-14
问题内容

我在Data Lake Gen2中拥有一个Azure存储帐户。我想使用Python(或Java)将数据从本地上传到Lake Gen2文件系统。

我已经找到了有关如何与存储帐户中的文件共享进行交互的示例,但是我仍无法找到如何上传到Lake(而不是文件共享)的示例。我还发现了如何在Gen1
Lakes上执行此操作,但是除了已关闭的Gen2
请求外,什么都没有。

我的问题是,到今天为止,是否可以用Python做到这一点?或者,如何使用Java将文件上传到Gen2
Lake?演示用于上传的API调用的代码片段将受到高度赞赏。


问题答案:

根据官方教程Quickstart: Upload, download, and list blobs with Python,如下所示,如果尚未注册Data Lake
Storage上
的多协议访问的公共预览,则不能直接使用Python的Azure存储SDK在Azure
Data Lake Store Gen 2中进行任何操作。

注意

仅当您在Data Lake Storage上注册多协议访问的公共预览时,本文中介绍的功能才可用于具有分层名称空间的帐户。要查看限制,请参阅已知问题文章。

因此,将数据上传到ADLS Gen2的唯一解决方案是使用ADLS Gen2的REST API,请参阅其参考Azure Data Lake Store REST API

这是我的示例代码,可以使用Python将数据上传到ADLS Gen2,并且工作正常。

import requests
import json

def auth(tenant_id, client_id, client_secret):
    print('auth')
    auth_headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    auth_body = {
        "client_id": client_id,
        "client_secret": client_secret,
        "scope" : "https://storage.azure.com/.default",
        "grant_type" : "client_credentials"
    }
    resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
    return (resp.status_code, json.loads(resp.text))

def mkfs(account_name, fs_name, access_token):
    print('mkfs')
    fs_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers)
    return (resp.status_code, resp.text)

def mkdir(account_name, fs_name, dir_name, access_token):
    print('mkdir')
    dir_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers)
    return (resp.status_code, resp.text)

def touch_file(account_name, fs_name, dir_name, file_name, access_token):
    print('touch_file')
    touch_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers)
    return (resp.status_code, resp.text)

def append_file(account_name, fs_name, path, content, position, access_token):
    print('append_file')
    append_file_headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "text/plain",
        "Content-Length": f"{len(content)}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content)
    return (resp.status_code, resp.text)

def flush_file(account_name, fs_name, path, position, access_token):
    print('flush_file')
    flush_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers)
    return (resp.status_code, resp.text)

def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
    print('mkfile')
    status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
    if status_code == 201:
        with open(local_file_name, 'rb') as local_file:
            path = f"{dir_name}/{file_name}"
            content = local_file.read()
            position = 0
            append_file(account_name, fs_name, path, content, position, access_token)
            position = len(content)
            flush_file(account_name, fs_name, path, position, access_token)
    else:
        print(result)


if __name__ == '__main__':
    tenant_id = '<your tenant id>'
    client_id = '<your client id>'
    client_secret = '<your client secret>'

    account_name = '<your adls account name>'
    fs_name = '<your filesystem name>'
    dir_name = '<your directory name>'
    file_name = '<your file name>'
    local_file_name = '<your local file name>'

    # Acquire an Access token
    auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
    access_token = auth_status_code == 200 and auth_result['access_token'] or ''
    print(access_token)

    # Create a filesystem
    mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
    print(mkfs_status_code, mkfs_result)

    # Create a directory
    mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
    print(mkdir_status_code, mkdir_result)

    # Create a file from local file
    mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)

希望能帮助到你。



 类似资料:
  • 问题内容: 我必须使用Python脚本自动将文件夹上传到FTP。我可以上传单个文件,但不能上传包含子文件夹和文件的文件夹。我做了很多搜索,但是失败了。有人可以帮我吗?提前致谢。 问题答案: 基本上,您需要使用os.walk()来获取这些文件并进行传输。 这是我为自己编写的脚本,可以完成您的大部分要求。我是很久以前写的,所以如果我再次写它,我可能会做不同的事情,但是我从中得到了很多利用。 它导入ps

  • 我需要使用python从组织终端下的本地上传文件到AWS S3存储桶。因为我是大型机资源,所以我对python不太了解,但是,当我尝试阅读一些内容时,我知道boto3可以帮助我。有人能帮我开始吗?

  • 我正在使用Jython从Java运行python脚本。当我使用调用python脚本时,我可以将数据作为命令行参数发送到python脚本。

  • 在控制器中(也可能是Main或另一个文件,我不确定这一切是如何工作的),我们有以下内容: 在Scene Bilder生成的FXML中,如下所示:

  • 我正在尝试用我的服务帐户在我的机器上本地使用简单的代码上传一个图片到Google云存储: 然而,我得到了下面的错误消息。是Google Cloud API只能在部署到App Engine上时工作,还是我在这里做错了什么?我能够使用相同的服务帐户使Google Vision API在本地工作。

  • 问题内容: 我有一个长度为2.2亿(固定)的int和float数组。现在,我想将这些阵列存储到内存和磁盘/从内存和磁盘上载。目前,我正在使用Java NIO的FileChannel和MappedByteBuffer解决此问题。它可以正常工作,但大约需要5秒钟(Wall Clock Time)(用于将阵列存储到内存或从内存上载到磁盘或从磁盘上载到磁盘)。实际上,我想要一个更快的。有人可以帮我吗,有没