【Azure Data Factory】第一篇 入门

端木令
2023-12-01

https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction
https://learn.microsoft.com/en-us/azure/data-factory/
https://learn.microsoft.com/en-us/azure/synapse-analytics/

https://learn.microsoft.com/en-us/azure/synapse-analytics/
https://learn.microsoft.com/en-us/azure/synapse-analytics/overview-what-is
https://learn.microsoft.com/en-us/azure/synapse-analytics/get-started
https://learn.microsoft.com/en-us/azure/synapse-analytics/machine-learning/what-is-machine-learning

win 环境变量
person-Path
;%HADOOP_HOME%\bin;
%JAVA_HOME%\bin;
%JAVA_HOME%\jre\bin;
%MAVEN_HOME%\bin;
D:\JAVA\SSH Secure Shell;
%USERPROFILE%.dotnet\tools

person-ClassPath
.;%JAVA_HOME%\lib\dt.jar;
%JAVA_HOME%\lib\tools.jar;

all-ClassPath
.;%JAVA_HOME%\lib\dt.jar;
%JAVA_HOME%\lib\tools.jar;

all-Path
D:\JAVA\spark-2.4.6-bin-hadoop2.6\bin;
%SYSTEMROOT%\System32\WindowsPowerShell\v1.0\;
%SystemRoot%\System32\Wbem;%SystemRoot%;
%SystemRoot%\system32;%HADOOP_HOME%\bin;
%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;
%MAVEN_HOME%\bin;
C:\Python27\;
C:\Python27\Scripts;
%CommonProgramFiles%\Microsoft Shared\Windows Live;
C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;
C:\Windows\System32\Wbem;C:\Program Files\Intel\WiFi\bin\;
C:\Program Files\Common Files\Intel\WirelessCommon\;
D:\MySQL Server 5.0\bin;C:\Windows\System32;
D:\JAVA\oracle-tools\instantclient_12_1;
D:\JAVA\Scala\bin;
D:\MicrosoftVSCode\bin;
D:\mingw64\bin;C:\Program Files\dotnet\

================================================================================
Azure 存储平台包括以下数据服务:
Azure Blobs:一个可大规模扩展的文本和二进制数据对象存储。还包括通过 Data Lake Storage Gen2 支持大数据分析。
Azure 文件:用于云或本地部署的托管文件共享。
Azure Elastic SAN(预览版):一个完全集成的解决方案,可简化 Azure 中 SAN 的部署、扩展、管理和配置。
Azure 队列:用于应用程序组件之间可靠消息传递的消息存储。
Azure 表:用于结构化数据的无模式存储的 NoSQL 存储。
Azure 托管磁盘:Azure VM 的块级存储卷。
Azure NetApp 文件:企业文件存储,由 NetApp 提供支持:使企业业务线 (LOB) 和存储专业人员可以轻松迁移和运行复杂的、基于文件的应用程序,而无需更改代码。Azure NetApp 文件通过 NetApp 帐户进行管理,可以通过 NFS、SMB 和双协议卷进行访问。

安全访问存储帐户
用于 blob、队列和表数据的 Azure Active Directory (Azure AD) 集成
通过 SMB 的 Azure AD 授权 Azure 文件
使用共享密钥授权
使用共享访问签名 (SAS) 的授权
带有 Azure NetApp 文件的 Active Directory 域服务

加密
静态加密 : Azure 存储加密保护你的数据,以满足你的组织安全性和合规性承诺。Azure 存储会在保存到存储帐户之前自动加密所有数据,并在检索之前对其进行解密。加密、解密和密钥管理过程对用户是透明的。
客户端加密 : Azure 存储客户端库提供了在通过网络发送数据和解密响应之前加密来自客户端库的数据的方法。

================================================================================

Azure Data Lake Storage Gen2
Data Lake Storage Gen2是一组专用于大数据分析的功能,构建于Azure Blob Storage之上,将Azure Data Lake Storage Gen1的功能与 Azure Blob Storage 融合在一起。例如,Data Lake Storage Gen2 提供文件系统语义、文件级安全性和规模。由于这些功能是建立在 Blob 存储之上的,因此您还可以获得低成本的分层存储,以及高可用性/灾难恢复功能。

Data Lake Storage Gen2 包括以下功能。
✓ Hadoop 兼容访问(Azure Blob 文件系统 (ABFS))
✓ 分层目录结构(与计算机上文件的组织方式大致相同。重命名或删除目录等操作成为目录上的单个原子元数据操作)
✓ 优化的成本和性能(分层命名空间功能允许高效访问和导航)
✓ 更细粒度的安全模型(基于角色的访问控制 (Azure RBAC) 和 UNIX (POSIX) 访问控制列表 (ACL) 的便携式操作系统接口,在目录级别或文件级别设置权限,使用 Microsoft 管理或客户管理的加密密钥进行静态加密)
✓ 巨大的可扩展性(提供海量存储并接受多种数据类型进行分析,可根据需求大小增加或缩减规模)
✓ 如果源数据位于 Azure 中,则当数据与支持 Data Lake Storage Gen2 的帐户位于同一 Azure 区域时,性能最佳。

✓Hadoop 支持一组为存储和处理结构化数据而优化的文件格式。一些常见的格式是 Avro、Parquet 和优化行列 (ORC) 格式。所有这些格式都是机器可读的二进制文件格式。它们经过压缩以帮助您管理文件大小。它们在每个文件中都嵌入了一个模式,这使它们具有自描述性。这些格式之间的区别在于数据的存储方式。Avro 以基于行的格式存储数据,而 Parquet 和 ORC 格式以列格式存储数据。

✓如果您的 I/O 模式写入量更大,或者查询模式倾向于完整检索多行记录,请考虑使用 Avro 文件格式。例如,Avro 格式与事件中心或 Kafka 等连续写入多个事件/消息的消息总线配合得很好。

✓当 I/O 模式的读取量更大或查询模式侧重于记录中列的子集时,请考虑 Parquet 和 ORC 文件格式。可以优化读取事务以检索特定列而不是读取整个记录。

✓Apache Parquet 是一种开源文件格式,针对读取繁重的分析管道进行了优化。Parquet 的列式存储结构可以让你跳过不相关的数据。您的查询效率更高,因为它们可以缩小范围将哪些数据从存储发送到分析引擎。此外,由于相似的数据类型(对于列)存储在一起,Parquet 支持高效的数据压缩和编码方案,可以降低数据存储成本。Azure Synapse Analytics、Azure Databricks和Azure Data Factory等服务具有利用 Parquet 文件格式的本机功能。

✓Consider pre-planning the structure of your data. File format, file size, and directory structure can all impact performance and cost.

✓数据管道对包含大量小文件的原始数据的控制有限。通常,系统启动某种进程,可以将小文件聚合成较大的文件。如果有实时处理数据,则可以将实时流引擎(例如Azure Stream Analytics或Spark Streaming)与消息代理(例如Event Hubs或Apache Kafka)一起使用,将数据存储为更大的文件。当将小文件聚合成更大的文件时,建议保存为读取优化格式(例如Apache Parquet)以供下游处理。

✓查询加速:通过在存储请求期间过滤掉不需要的数据而不是由应用程序读取全量数据再过滤,消除不必要的网络传输成本,节约解析和过滤不需要的数据所需的应用程序CPU 负载

================================================================================
azure-data-factory
Azure Data Factory 是基于云的 ETL 和数据集成服务,可让您创建数据驱动的工作流,以大规模编排数据移动和转换数据。使用 Azure 数据工厂,你可以创建和安排数据驱动的工作流(称为管道),这些工作流可以从不同的数据存储中提取数据。您可以构建复杂的 ETL 流程,通过数据流或使用计算服务(例如 Azure HDInsight Hadoop、Azure Databricks 和 Azure SQL 数据库)直观地转换数据。此外,您可以将转换后的数据发布到数据存储,例如 Azure Synapse Analytics,供商业智能 (BI) 应用程序使用。最终,通过 Azure 数据工厂,可以将原始数据组织成有意义的数据存储和数据湖,以做出更好的业务决策。

================================================================================
Azure Synapse Analytics
Azure Synapse是一项企业分析服务,可加快跨数据仓库和大数据系统的洞察力。Azure Synapse 汇集了企业数据仓库中使用的最佳SQL技术、用于大数据的Spark技术、用于日志和时间序列分析的Data Explorer 、用于数据集成和 ETL/ELT 的管道,以及与Power等其他 Azure 服务的深度集成BI、CosmosDB和AzureML。


行业领先的 SQL
Synapse SQL是 T-SQL 的分布式查询系统,可实现数据仓库和数据虚拟化场景,并扩展 T-SQL 以解决流和机器学习场景。
Synapse SQL 提供无服务器和专用资源模型。为了获得可预测的性能和成本,创建专用的 SQL 池来为存储在 SQL 表中的数据保留处理能力。对于计划外或突发性工作负载,请使用始终可用的无服务器 SQL 端点。使用内置的流功能将云数据源中的数据登陆到 SQL 表中。通过使用机器学习模型将 AI 与 SQL 集成,以使用T-SQL PREDICT 函数对数据进行评分。

行业标准的 Apache Spark
Apache Spark for Azure Synapse深度无缝集成了 Apache Spark——最流行的开源大数据引擎,用于数据准备、数据工程、ETL 和机器学习。具有 SparkML 算法的 ML 模型和 Apache Spark 3.1 的 AzureML 集成,内置对 Linux Foundation Delta Lake 的支持。简化的资源模型,让您无需担心管理集群。快速的 Spark 启动和积极的自动缩放。对 .NET for Spark 的内置支持允许您在 Spark 应用程序中重用您的 C# 专业知识和现有的 .NET 代码。

内置数据集成
Azure Synapse 包含与 Azure 数据工厂相同的数据集成引擎和体验,使你无需离开 Azure Synapse Analytics 即可创建丰富的大规模 ETL 管道。从 90 多个数据源中提取数据,具有数据流活动的无代码 ETL,编排笔记本、Spark 作业、存储过程、SQL 脚本等。

使用 Data Explorer 作为数据平台来构建近乎实时的日志分析和 IoT 分析解决方案,以:
跨本地、云和第三方数据源整合和关联您的日志和事件数据。
加速您的 AI Ops 之旅(模式识别、异常检测、预测等)
替换基于基础架构的日志搜索解决方案以节省成本并提高生产力。
为您的 IoT 数据构建 IoT Analytics 解决方案。
构建分析 SaaS 解决方案,为您的内部和外部客户提供服务。

统一体验
Synapse Studio为企业提供了一种在单一用户体验中构建解决方案、维护和保护所有内容的单一方法
执行关键任务:摄取、探索、准备、编排、可视化
跨 SQL、Spark 和 Data Explorer 监控资源、使用情况和用户
使用基于角色的访问控制来简化对分析资源的访问
编写 SQL、Spark 或 KQL 代码并与企业 CI/CD 流程集成


在 Synapse Analytics 中使用 SQL 语言查询 Azure Data Lake Storage Gen2
SELECT
TOP 100 *
FROM
OPENROWSET(
BULK ‘https://.dfs.core.windows.net//folder1/On_Time.csv’,
FORMAT=‘CSV’,
PARSER_VERSION=‘2.0’
) AS [result]


Analyze data with a serverless SQL pool

SELECT
TOP 100 *
FROM
OPENROWSET(
BULK ‘https://contosolake.dfs.core.windows.net/users/NYCTripSmall.parquet’,
FORMAT=‘PARQUET’
) AS [result]

CREATE DATABASE DataExplorationDB COLLATE Latin1_General_100_BIN2_UTF8
//Latin1_General_100_BIN2_UTF8 provides the best performance in the queries that read data from Parquet files and Azure Cosmos DB containers

USE DataExplorationDB

CREATE EXTERNAL DATA SOURCE ContosoLake
WITH ( LOCATION = ‘https://contosolake.dfs.core.windows.net’)

CREATE LOGIN data_explorer WITH PASSWORD = ‘My Very Strong Password 1234!’;

CREATE USER data_explorer FOR LOGIN data_explorer;
GO
GRANT ADMINISTER DATABASE BULK OPERATIONS TO data_explorer;
GO

SELECT
TOP 100 *
FROM
OPENROWSET(
BULK ‘/users/NYCTripSmall.parquet’,
DATA_SOURCE = ‘ContosoLake’,
FORMAT=‘PARQUET’
) AS [result]


Azure Databricks 和 Spark
1、创建容器并挂载
configs = {“fs.azure.account.auth.type”: “OAuth”,
“fs.azure.account.oauth.provider.type”: “org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”,
“fs.azure.account.oauth2.client.id”: “”,
“fs.azure.account.oauth2.client.secret”: “”,
“fs.azure.account.oauth2.client.endpoint”: “https://login.microsoftonline.com//oauth2/token”,
“fs.azure.createRemoteFileSystemDuringInitialization”: “true”}
dbutils.fs.mount(
source = “abfss://@.dfs.core.windows.net/folder1”,
mount_point = “/mnt/flightdata”,
extra_configs = configs)

2、Use Databricks Notebook to convert CSV to Parquet

Use the previously established DBFS mount point to read the data.

create a data frame to read data.

Python

flightDF = spark.read.format(‘csv’).options(header=‘true’, inferschema=‘true’).load(“/mnt/flightdata/*.csv”)

read the airline csv file and write the output to parquet format for easy query.

flightDF.write.mode(“append”).parquet(“/mnt/flightdata/parquet/flights”)
print(“Done”)

3、Explore data
import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls(“/mnt/flightdata”))
dbutils.fs.put(“/mnt/flightdata/1.txt”, “Hello, World!”, True)
dbutils.fs.ls(“/mnt/flightdata/parquet/flights”)

4、查询分析数据

Run each of these queries, preferably in a separate cmd cell for separate analysis

create a temporary sql view for querying flight information

FlightTable = spark.read.parquet(‘/mnt/flightdata/parquet/flights’)
FlightTable.createOrReplaceTempView(‘FlightTable’)

create a temporary sql view for querying airline code information

AirlineCodes = spark.read.parquet(‘/mnt/flightdata/parquet/airlinecodes’)
AirlineCodes.createOrReplaceTempView(‘AirlineCodes’)

using spark sql, query the parquet file to return total flights in January and February 2016

out1 = spark.sql(“SELECT * FROM FlightTable WHERE Month=1 and Year= 2016”)
NumJan2016Flights = out1.count()
out2 = spark.sql(“SELECT * FROM FlightTable WHERE Month=2 and Year= 2016”)
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)

List out all the airports in Texas

out = spark.sql(
“SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = ‘Texas’”)
print('Airports in Texas: ', out.show(100))

find all airlines that fly from Texas

out1 = spark.sql(
“SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName=‘Texas’”)
print('Airlines that fly to/from Texas: ', out1.show(100, False))

5、清理资源
When they’re no longer needed, delete the resource group and all related resources. To do so, select the resource group for the storage account and select Delete

 类似资料: