当前位置: 首页 > 工具软件 > Dask > 使用案例 >

DASK

司寇祖鹤
2023-12-01

一. Dask介绍

Dask是一款用于分析计算的灵活并行计算库。
Dask 是一个开源项目,为你提供 NumPy 数组、Pandas Dataframes 以及常规 list 的抽象,
	允许你使用多核处理器并行运行它们的操作。
Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,
    与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,
    如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。
Dask由两部分组成:
    ·针对计算优化的动态任务调度。这与Airflow,Luigi,Celery或Make类似,但针对交互式计算工作负载进行了优化。
    · "大数据"集合, 像并行数组,数据框和列表一样,它们将通用接口(如NumPy,Pandas或Python迭代器)扩展到大于内存或分布式环境。 
        这些并行集合运行在动态任务调度器之上。

二. dask安装

· 在线安装:
    pip install dask: # 仅安装核心部分
    pip install "dask[complete]" : 安装完整版本
· 离线安装:
    方法步骤[https://blog.csdn.net/weixin_39916734/article/details/95345351] 
    注意: dask的有一些库要求的python版本 > 2.7.8 或者 3.4——版本过低容易被坑

三. 数据结构

· Arrays:
    Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。
    对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,我们会将其全部塞进内存里,
    那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,
    我们只需要控制每块数据集不超过内存,从而满足In-Memory计算了?Dask就是这样做的。
· Dataframes:
    Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使对大于内存的数据也是能够处理的.
· Bags:
    对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等

四. High Level Graph简介

比如我们在创建Dask Dataframe时,其实是通过HighLevelGraph构建了一个任务图,那么这个任务图是什么?
其实他本质上就是一个字典结构(Dict),从组成元素来看,一共由两部分组成,
一个是动作(可看做是Task Graph中的节点),一个是依赖(可看做是Task Graph中的边)

四. 分布式简介

在Dask 分布式中(也可以是伪分布式,即在本机中通过线程或者进程来并行处理),共有三种角色:Client端,Scheduler端以及Worker端,
· Client负责提交Task给Scheduler
· Scheduler负责对提交的Task按照一定的策略分发给Worker
· Worker进行实际的计算、数据存储,在此期间,Scheduler时刻关注着Worker的状态。

五. 延时计算与即时计算

· Delayed: 
    只需在一个普通的Python Function上面通过dask.delayed函数进行封装,就能得到一个Delayed对象
    只是构建了一个Task Graph,并没有进行实际的计算,只有调用compute的时候,才开始进行计算
· Future:
    对于Future是立即执行的,可以通过submit、map方法将一个Function提交给Scheduler,
    在后台,Scheduler会对提交的任务进行处理并分发给Workers进行实际的计算。
    当任务提交后,会返回一个指向任务运行结果的Key值,即Future对象,
    我们可以跟踪其当前状态,当然我们也可以通过result和gather方法等待任务完成后从而将结果收集到本地

二. 简单使用

· 读取csv
    import dask.dataframe as dd
    df = dd.read_csv('2015-*-*.csv')
    df.groupby(df.user_id).value.mean().compute()
· 读取hdf5
    import dask.array as da
    f = h5py.File('myfile.hdf5')
    x = da.from_array(f['/big-data'], chunks=(1000, 1000))
    x - x.mean(axis=1).compute()
· Dask Bag
    import dask.bag as db
    b = db.read_text('2015-*-*.json.gz').map(json.loads)
    b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
· Dask delayed
    from dask import delayed
    L = []
    for fn in filenames:                  # Use for loops to build up computation
        data = delayed(load)(fn)          # Delay execution of function
        L.append(delayed(process)(data))  # Build connections between variables
    result = delayed(summarize)(L)
    result.compute()
· Client
    client = Client(processes=True)
    from_sequence(seq[, partition_size, npartitions]): Create a dask Bag from Python sequence.
    dask.compute(): Compute several dask collections at once.

参考:

  1. Dask教程: https://www.cnblogs.com/HelloGreen/p/8817863.html
  2. Dask: https://blog.csdn.net/jack_jmsking/article/details/91433854
  3. python︱大规模数据存储与读取、并行计算:Dask库简述: https://blog.csdn.net/sinat_26917383/article/details/78044437
 类似资料: