当前位置: 首页 > 知识库问答 >
问题:

Azure Blob存储和Azure数据库之间的高效数据检索过程

陆弘新
2023-03-14

我正在设计一个新的数据环境,目前正在开发我的概念证明。在这里,我使用以下架构:Azure函数-

我目前正在努力解决的是如何优化“数据检索”的想法,以便在Azure Databricks上支持我的ETL过程。

我正在处理事务性工厂数据,这些数据通过前面的通道按分钟提交给Azure blob存储。因此,我每天都有86000个文件需要处理。事实上,这需要处理大量的独立文件。目前,我使用下面这段代码来构建一个当前存在于azure blob存储中的文件名列表。接下来,我通过使用循环读取每个文件来检索它们。

我面临的问题是这个过程需要时间。当然,我们在这里谈论的是需要读取的大量小文件。所以我不指望这个过程能在几分钟内完成。

我知道升级数据库管理员集群可能会解决这个问题,但我不确定只有这样才能解决这个问题,看看在这种情况下我需要传输的文件数量。我正在运行数据库管理员的代码。

# Define function to list content of mounted folder
def get_dir_content(ls_path):
  dir_paths = ""
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
filenames = []
paths = 0

mount_point = "PATH"

paths = get_dir_content(mount_point)
for p in paths:
#   print(p)
  filenames.append(p)

avroFile = pd.DataFrame(filenames)
avroFileList = avroFile[(avroFile[0].str.contains('.avro')) & (avroFile[0].str.contains('dbfs:/mnt/PATH'))]
avro_result = []
# avro_file = pd.DataFrame()
avro_complete = pd.DataFrame()
for i in avroFileList[0]:
  avro_file = spark.read.format("avro").load(i)
  avro_result.append(avro_file)

最后,我为所有这些文件做了一个联合,以创建一个数据框架。

# Schema definiëren op basis van 
avro_df = avro_result[0]

# Union all dataframe
for i in avro_result:
  avro_df = avro_df.union(i)

display(avro_df)

我想知道如何优化这个过程。按分钟输出的原因是,一旦我们有了分析报告架构(我们只需要一个日常流程),我们计划稍后构建一个“接近实时的洞察力”。

共有2个答案

司徒斌
2023-03-14

有多种方法可以做到这一点,但我会这样做:

每当在你的azure存储帐户中创建新的blob时,使用Azure函数触发你的python代码。这将删除代码中的轮询部分,并在存储帐户上有可用文件时立即将数据发送到数据块

例如,对于接近实时的报告,您可以使用Azure流分析并在Event Hub和output上运行查询来支持Bi。

应俭
2023-03-14

与其列出文件,然后单独阅读它们,我建议您使用Azure Databricks Autoloader。它可以使用通知来查找上传到blob存储的新文件,而不是列出文件。

它还可以在某个时间点处理多个文件,而不是一个接一个地读取它们

如果不需要连续处理数据,则可以使用 .trigger(once=True)来模拟数据的批量加载。

 类似资料:
  • 我正在使用Facenet算法进行人脸识别。我想基于此创建应用程序,但问题是Facenet算法返回一个长度为128的数组,即每个人的人脸嵌入。 对于人物识别,我必须找到两个人面部嵌入之间的欧几里得差异,然后检查它是否大于阈值。如果是,那么这些人是相同的;如果它小于,那么这些人是不同的。 比方说,如果我必须在10k人的数据库中找到人x。我必须计算每个人嵌入的差异,这是没有效率的。 有没有办法有效地存储

  • 我需要你的一些建议。我试图用redis和哈希(redis类型)存储一些非常有效的内存数据。有一些随机字符串列表(在rfc中平均大小是40个字符,但最大可能是255个字符)--它是文件id,例如我们有100kk的file_id列表。我们还需要每个ID的轨道2参数:download_count(int,incremented)和server_id--tiny int,redis config添加了:

  • 问题内容: 您好,我在较早之前发布了此内容,并获得了一些帮助,但仍然没有有效的解决方案。由于最后的问与答,我确定我的“保存到数据库”代码以及“检索到图片”代码有问题。即使我手动将图片保存在数据库中,它也不会恢复。这是我从网络上的3或4个示例中修补而成的代码。理想情况下,如果有人拥有一些已知的良好代码,并且可以指导我这样做,那将是最好的。 ![在此处输入图片描述] [1] ‘*使用以下代码检索到图片

  • 我正在数据砖笔记本上运行这个 我得到了这个错误 原因:存储异常:服务器无法对请求进行身份验证。确保授权头的值格式正确,包括签名。 我尝试使用 以获取 Azure Blob 存储中的任何更新,但仍收到上述错误。

  • 本文向大家介绍如何使用Python将数据存储和检索到Sqlite3数据库中?,包括了如何使用Python将数据存储和检索到Sqlite3数据库中?的使用技巧和注意事项,需要的朋友参考一下 您可以使用sqlite3模块轻松地将日期存储和检索到Sqlite3数据库中。在数据库中插入日期时,直接传递日期,Python会自动处理它。 示例 输出结果 这将给出输出- 现在,当您从数据库中获取值时,您将获得已