Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。

文章目录

  • Dask的数据模型概述
  • Collections:Delayed
    • delayed函数
    • Delayed对象和DelayedLeaf对象
  • Graph
    • HighLevelGraph
    • Delayed与Graph的关系
  • Schedulers
    • DaskMethodsMixin
  • 小结

Dask的数据模型概述

在阅读代码前,给大家一些小建议:
15个小技巧包括:
了解作者开发项目的目的
先熟练的使用项目
阅读官方文档
理解项目中的概念
了解项目技术背景
没必要读最新版本的代码
不需要读完所有的源码
版本间比较阅读
自顶向下梳理
自底向上归纳
先做减法,再做加法
从接口找关系
画图辅助阅读
设计模式辅助阅读
debug只是辅助

那么对于Dask,咱们先用一个关键技巧,阅读官方文档,捋一捋Dask的数据模型:Dask官方文档链接
在官方文档首页,就提到了Dask的数据模型

这幅图很好的概述了dask的几个概念与基本原理:

  1. dask目前支持5种主要的数据结构,分别是Array(用于存放类numpy的多维数组),DataFrame(不用多说,类pandas的二维表结构的数据帧),Bag(更简单的一个数组),Delayed(对函数的异步处理封装,针对本地多进程与多线程),Futures(对函数的分布式异步提交处理封装,比delayed多提供网络api)。
  2. Graph,计算图,也有一个比较常见的概念交DAG(有向无环图)。Dask的任务都是先转化成一个个异步任务,并连接成Graph,等到必要的时候才去执行,也就是lazy懒执行的方式来做的。
  3. Scheduer,调度器,既有本机的多进程、多线程、同步调度器,也有分布式任务调度器。

官网给出了几个调用例子,这里我们调整一下啃源码的顺序,上图的collections->task graph->schedulers顺序不变。但我们先看最基本的delayed异步处理,再看futures分布式异步处理,再去读bag->array->dataframe。安排这个顺序的原因是源码阅读技巧之一:“了解项目技术背景”。我们已经用过dask,最简单的dask应用无非就是inc(自增)+add(两数相加)的一个异步调用场景,这样无需了解更多的数据结构与代码。

Collections:Delayed

最简单的一个计算任务:

用Python直接实现,结果都能猜出来吧:

from time import sleep
def inc(x):# 便于观察异步效果sleep(1)return x + 1def add(x, y):sleep(1)return x + y
if __name__ == '__main__':x = inc(1) # 等1秒,x=2y = inc(2) # 等1秒,y=3z = add(x, y) #等1秒,z=5

在python里串行,时长3秒左右,结果是5。
通过看这个简单任务的Graph我们可以发现,其实inc这个自增函数,其实是可以并行的对吧?如果我们多线程或多进程或者分布式跑inc,最后结果汇总后,再执行add,时间是可以缩短到2秒左右的。
那么Delayed的作用,就是通过简单的方法,让我们可以做到并行。修改后的代码如下:

from time import sleep
from dask import delayeddef inc(x):sleep(1)return x + 1def add(x, y):sleep(1)return x + yif __name__ == '__main__':x = delayed(inc)(1)  # dask的命名做的还是不错的,名副其实,delayed就是把我们的任务延期执行,等到整个任务都规划好了,用尽可能好的方法调度,来提高效率,实现并行。y = delayed(inc)(2)z = delayed(add)(x, y)print(z) # 立即得到一个Delayed对象,但任务是延期执行的,并未实际执行。print(z.compute()) # 实际去执行,等待2s左右,会输出5

OK,这里我们先提出几个问题,然后带着问题去肯源码。
1、delayed对add、inc做了什么?或者说把两个函数变成了什么?
2、为什么调用完函数又调用了一次?delayed(inc)(1) 这里的(1)是传递给了谁?
3、z.compute()又发生了什么

delayed函数

好了,带着问题读源码总比没目标的读要强,咱们先debug第一行,注意,这里还未用到scheduer和worker,可以先不启动scheduer和worker进程:
x = delayed(inc)(1)
加了断点直接跳入(step into),会发现我们步入了一个陌生的库,toolz,什么鬼?
大概的代码如下:

# toolz.functoolz
class curry(object):...def __call__(self, *args, **kwargs):try:return self._partial(*args, **kwargs)except TypeError as exc:if self._should_curry(args, kwargs, exc):return self.bind(*args, **kwargs)raise

可恶,半路杀出个程咬金,要不要管呢?建议是先不要管,原因很简单,因为我们的问题中现在没有这个问题,我们可以把问题记下来,放到第二波问题列表里去解决。
我们跳出__call__,再次step in,会看到delayed函数内容:

@curry
def delayed(obj, name=None, pure=None, nout=None, traverse=True):
"""Wraps a function or object to produce a ``Delayed``.``Delayed`` objects act as proxies for the object they wrap, but alloperations on them are done lazily by building up a dask graph internally.。。。
"""if isinstance(obj, Delayed):  # 如果已经是Delayed对象,则直接返回,我们可以猜测,本函数返回的一定是Delayed或它的子类的对象。return objif is_dask_collection(obj) or traverse:task, collections = unpack_collections(obj)  # 分出任务与集合,可以跟到源码里看下,根据注释也能猜个大概:# task就是要执行的任务,可以是普通的python函数,也可能是Delayed对象的_key参数。# collections,合并子图的集合else:task = quote(obj)collections = set()if task is obj: # 任务即本身,一般发生在要转delayed的是一个原python函数的时候。if not (nout is None or (type(nout) is int and nout >= 0)):raise ValueError("nout must be None or a non-negative integer, got %s" % nout)if not name:  # 任务没有名字的话,会生成一个唯一id作为名字try:prefix = obj.__name__except AttributeError:prefix = type(obj).__name__token = tokenize(obj, nout, pure=pure)name = "%s-%s" % (prefix, token)return DelayedLeaf(obj, name, pure=pure, nout=nout)  生成Delayedelse:if not name:name = "%s-%s" % (type(obj).__name__, tokenize(task, pure=pure))layer = {name: task}graph = HighLevelGraph.from_collections(name, layer, dependencies=collections)return Delayed(name, graph)

Delayed对象和DelayedLeaf对象

Delayed对象和DelayedLeaf对象的方法与属性如下图所示。

DelayedLeaf源码不多,我贴出来:

class DelayedLeaf(Delayed):__slots__ = ("_obj", "_key", "_pure", "_nout")def __init__(self, obj, key, pure=None, nout=None):self._obj = objself._key = keyself._pure = pureself._nout = nout@propertydef dask(self):return HighLevelGraph.from_collections(self._key, {self._key: self._obj}, dependencies=())def __call__(self, *args, **kwargs):return call_function(self._obj, self._key, args, kwargs, pure=self._pure, nout=self._nout)

我们例子里给的inc函数会生成一个DelayedLeaf对象:

_key:inc的name的话是<函数名+uuid>作为_key存在了属性中,例如’inc-fb15e064-05f6-4418-9158-ed8551f93a56’
_obj:就是函数体本身
_pure:是否是纯函数,这里先不管,函数纯不纯看一下定义:https://www.jianshu.com/p/400926eeadfb
_nout:返回值的数量,这里先不管。

OK,咱们可以回答第一个问题,

1、delayed对add、inc做了什么?或者说把两个函数变成了什么?
delayed是把inc、add等转成了DelayedLeaf对象,而DelayedLeaf继承自Delayed对象。函数本身存在属性_obj中,并且给了一个唯一id作为key,存在_key属性中。

继续向下debug,到了delayed(inc)(1) 这里的(1),应该是一次调用,这个其实就是调用了DelayedLeaf的__call__方法(源码看上面),它的源码也很简单,返回call_function的结果。那么call_function是返回啥呢?

def call_function(func, func_token, args, kwargs, pure=None, nout=None):dask_key_name = kwargs.pop("dask_key_name", None)pure = kwargs.pop("pure", pure)if dask_key_name is None:name = "%s-%s" % (funcname(func),tokenize(func_token, *args, pure=pure, **kwargs),)else:name = dask_key_nameargs2, collections = unzip(map(unpack_collections, args), 2)collections = list(concat(collections))if kwargs:dask_kwargs, collections2 = unpack_collections(kwargs)collections.extend(collections2)task = (apply, func, list(args2), dask_kwargs)else:task = (func,) + args2graph = HighLevelGraph.from_collections(name, {name: task}, dependencies=collections)nout = nout if nout is not None else Nonereturn Delayed(name, graph, length=nout)

嗯,从源码上看生成了一个新的Delayed对象,name(_key)又生成了一个新的,有一个graph是通过HighLevelGraph.from_collections生成的,然后nout是返回值数量。
这里怎么理解呢?
inc本身生成了一个DelayedLeaf对象,而DelayedLeaf的调用又产生一个全新的Delayed对象,它其实是DelayedLeaf中保存的_obj属性(即inc函数),再加上了传入参数1的一个结合。即把inc(1)变成了一个Delayed对象
call_function函数里还有一个关键,graph。还记得本篇开始的图吗?创建collections 一个最重要的目的就是create task graphs。

Graph

HighLevelGraph

我们重点看一下

    graph = HighLevelGraph.from_collections(name, {name: task}, dependencies=collections)

from_collectionsHighLevelGraph的工厂函数,负责创建HighLevelGraph对象。创建的时候要告诉它name,layer,dependencies。

  • name:就是Delayed的_key(不是DelayedLeaf的_key)
  • layer:可以理解为计算层吧,是一个字典,{name:task}构成的,这儿的task是一个元组,是有(任务,参数)构成,即(inc函数,1)构成的,add的话是由(add, ‘inc-edb7daeb-62f9-4726-84f9-a93b3621a32f’, ‘inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b’)构成的。即只保留了key,value在dependencies里。
  • dependencies:依赖的任务,inc的话是空,它的参数是可以立即获得的,add函数的话,它会依赖两个inc(x)的结果,所以会是[Delayed(‘inc-edb7daeb-62f9-4726-84f9-a93b3621a32f’), Delayed(‘inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b’)]对象列表。

graph类图:

layers是一个字典结构,是把各计算层通过字典结构构建起来的一个更大的字典,里面存储了Delayed对象的_key、具体的执行函数、参数。比如刚才的inc与add的layers:

{'add-28bab8ca-e244-4962-a2af-f93220659ce8': {'add-28bab8ca-e244-4962-a2af-f93220659ce8': (<function __main__.add(x, y)>,'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f','inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b')},'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': {'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': (<function __main__.inc(x)>,1)},'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': {'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': (<function __main__.inc(x)>,2)}}

dependencies里存了依赖关系,我们知道add肯定依赖于inc,打印出来就是

{'add-28bab8ca-e244-4962-a2af-f93220659ce8': {'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b','inc-edb7daeb-62f9-4726-84f9-a93b3621a32f'},'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': set(),'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': set()}

Delayed与Graph的关系

OK,至此Delayed对象内容有什么就清晰了,它有一个key做唯一标识,然后属性里保存了graph计算图,存储在dask属性里。这样我们可以回答第二个问题了

2、为什么调用完函数又调用了一次?delayed(inc)(1) 这里的(1)是传递给了谁?
delayed(inc)是一个Delayed对象,再调一次是把Delayed与参数组合成一个新的Delayed对象。具体存储在Delayed的dask属性中,而这个属性是一个HighLevelGraph,通过字典形式存储了key、task、args等信息以及他们的依赖关系。

dask的代码结构还是比较清晰的,delayed相关的都在dask/delayed.py下,HighLevelGraph相关的代码在dask/highlevelgraph.py下。比较好找。

Schedulers

DaskMethodsMixin

目前我们有了一个Graph,我们如何让它进行计算呢?这里就不得不提到Delayed的父类:DaskMethodsMixin了,它的代码在dask/base.py里面,这里的代码是整个dask源码的基础类或方法所在。

class DaskMethodsMixin(object):def visualize(self, filename="mydask", format=None, optimize_graph=False, **kwargs):# 用graphviz对dask的collections进行可视化,不是咱们的重点,有兴趣的可自行研究...def persist(self, **kwargs):# 持久化,这个是把dask的collections内容都读到内存中。后面我们再研究,暂时先不管...def compute(self, **kwargs):# 对dask的collections执行计算,# 支持的参数包括scheduler,即可以指定用哪种调度器去执行任务,不指定的话也会自动识别scheduer# optimize_graph,是否对graph进行优化后进行,默认会优化(result,) = compute(self, traverse=False, **kwargs)return resultdef __await__(self):# 使用async/await异步处理,先不管。def compute(*args, **kwargs):traverse = kwargs.pop("traverse", True)optimize_graph = kwargs.pop("optimize_graph", True)collections, repack = unpack_collections(*args, traverse=traverse)if not collections:return args# 获取scheduler,跟进去看的话,其实默认会用threaded.get作为scheduerschedule = get_scheduler(scheduler=kwargs.pop("scheduler", None),collections=collections,get=kwargs.pop("get", None),)dsk = collections_to_dsk(collections, optimize_graph, **kwargs)keys, postcomputes = [], []for x in collections:keys.append(x.__dask_keys__())postcomputes.append(x.__dask_postcompute__())results = schedule(dsk, keys, **kwargs)return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

以我们当前的例子,scheduler会使用多线程的方式进行计算。源码在dask/threaded.py中的get函数定义:

def get(dsk, result, cache=None, num_workers=None, pool=None, **kwargs):""" Threaded cached implementation of dask.getParameters----------dsk: dictA dask dictionary specifying a workflowresult: key or list of keysKeys corresponding to desired datanum_workers: integer of thread countThe number of threads to use in the ThreadPool that will actually execute taskscache: dict-like (optional)Temporary storage of resultsExamples-------->>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}>>> get(dsk, 'w')4>>> get(dsk, ['w', 'y'])(4, 2)"""global default_poolpool = pool or config.get("pool", None)num_workers = num_workers or config.get("num_workers", None)thread = current_thread() # 获取当前线程with pools_lock:if pool is None:if num_workers is None and thread is main_thread:# 初次运行,会按CPU核数创建线程池if default_pool is None:default_pool = ThreadPool(CPU_COUNT)atexit.register(default_pool.close)pool = default_poolelif thread in pools and num_workers in pools[thread]:pool = pools[thread][num_workers]else:pool = ThreadPool(num_workers)atexit.register(pool.close)pools[thread][num_workers] = pool# 获取异步结果,注意是会阻塞当前线程的。这里我们不展开看,等我们看完数据模型后,一起看scheduler。results = get_async(pool.apply_async,len(pool._pool),dsk,result,cache=cache,get_id=_thread_get_id,pack_exception=pack_exception,**kwargs)# Cleanup pools associated to dead threadswith pools_lock:active_threads = set(threading.enumerate())if thread is not main_thread:for t in list(pools):if t not in active_threads:for p in pools.pop(t).values():p.close()return results

终于,我们的结果出来了,是5。
所以回答第3个问题:

z.compute()又发生了什么?
z.compute()又发生了什么做了两件主要的事情,1是决定用哪个scheduler,另外就是讲任务提交给scheduler并等待结果。

小结

本篇主要讲解了dask里面第一个数据模型Delayed,让我们大致认识到了dask的工作机制:把计算、数据通过collections,组织成graph,然后交给scheduler。等最后结果回来。
下一篇,我们开始研究另外一个数据模型,Bag。

01 Dask源码剖析-Dask的数据模型-Delayed相关推荐

  1. 02 Dask源码剖析-Dask的数据模型-Bag

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Collection:Bag Bag的创建 从内存序列创建(from_sequence) 从文本文件创建( ...

  2. 03 Dask源码剖析-Dask的数据模型-Array

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Collection:Array Array的创建 from_array加载 Collection:Arr ...

  3. stl源码剖析_STL之set源码剖析

    " 源码面前,了无秘密." 之前在 小bug蕴含大能量 中讲过一个和set相关的bug,说过要从红黑树到STL 红黑树,再到STL set的源码逐步掌握整个知识架构. 最近已经把这 ...

  4. Spring源码剖析——Bean的配置与启动

    IOC介绍   相信大多数人在学习Spring时 IOC 和 Bean 算得上是最常听到的两个名词,IOC在学习Spring当中出现频率如此之高必然有其原因.如果我们做一个比喻的话,把Bean说成Sp ...

  5. LinkedList源码剖析

    1. LinkedList简介 LinkedList是基于双向循环链表(从源码中可以很容易看出)实现的,除了可以当作链表来操作外,它还可以当作栈,队列和双端队列来使用. LinkedList同样是非线 ...

  6. mysql启用组提交变量_MySQL的COMMIT_ORDER模式下组提交分组实现与BUG案例源码剖析...

    背景 自MySQL 5.7以来,组提交大面积应用,已经不断地得到优化.但网上有关组提交的实现机制,却还不够详细.故障多的时候,往往会发生一些模棱两可的揣测和猜疑.因此,笔者有了从自己的角度,去分析组提 ...

  7. webgl编程指南源码_ThreeJS 源码剖析之 Renderer(一)

    引子? 最近,忽然想起曾在 WebGL 基础系列 文章中立下 flag:"后续还打算出 <ThreeJS 源码剖析> 系列"(特意翻出原话?),项目忙了一阵后,便决定开 ...

  8. GDAL源码剖析(四)之命令行程序说明二

    接博客GDAL源码剖析(四)之命令行程序说明一http://blog.csdn.net/liminlu0314/article/details/6978589 其中有个nearblack,gdalbu ...

  9. 转 Spring源码剖析——核心IOC容器原理

    Spring源码剖析--核心IOC容器原理 2016年08月05日 15:06:16 阅读数:8312 标签: spring 源码 ioc 编程 bean 更多 个人分类: Java https:// ...

最新文章

  1. 基于RDKit探索DrugBank(demo)
  2. JS循环精灵图背景-遍历背景图片
  3. 用python简单处理图片(4):图像中的像素访问
  4. [20150805]提升scn4.txt
  5. Sharding-JDBC数据库_垂直切分_Sharding-Sphere,Sharding-JDBC分布式_分库分表工作笔记012
  6. V - 不容易系列之(4)――考新郎(第二季水)
  7. Python 自动化库介绍 PySimpleGUI
  8. .net 3.5 数据库开发 之 LINQ 上
  9. matlab全局变量_MATLAB笔记(一):工具箱的卸载、阻尼振动波形图程序
  10. java实现人脸识别源码【含测试效果图】——前期准备工作及访问提示
  11. 微信域名防封的注意点,微信域名防封系统原理
  12. 经常失眠怎么办?这些方法和好物可以帮到你
  13. 京东上千页面搭建基石——CMS前后端分离演讲史读后感
  14. 如涵控股完成私有化交易:赴美上市刚满两年,市值已缩水超七成
  15. 我的世界不退出服务器切换账号,我的世界服务器退出指令
  16. 彻底掌握 Promise-原生Promise的实现(二) Promise的链式调用
  17. cross-entropy for one-stage detecor
  18. Win10删除文件权限不足的一种可能的解决方式
  19. Chrome已实现对H.265/HEVC的硬解支持
  20. 游戏开发物理引擎PhysX研究系列:运行官方较完整的demo

热门文章

  1. Uncaught (in promise) TypeError: Illegal invocation
  2. 抗击洪涝灾害,河道水雨情动态在线监测解决方案
  3. 拍摄视频知识要领(一)
  4. oracle关联查询取交集,Oracle 取两个表中数据的交集并集差异集合
  5. 你真的了解闰年吗? (附区分 闰年平年的程序示例)
  6. ubuntu 18.04自动更新后分辨率只剩下640x480选项
  7. C语言凯撒密码字母向后偏移三位,凯撒加密解密(java字母移位)
  8. 糖果风格翻盖的智能手机价格仅1700元 是活不下去了?
  9. jQuery RemoveAttr(checked)之后再Attr(checked)属性无效果的原因分析
  10. 运行phpstudy时,显示3306端口被占用