一、Airflow简介

Airflow是一个编排、调度和监控workflow的平台。Airflow的核心概念有五个:

DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。

Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。

Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。

Task Instance:task的一次运行。task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖于TaskA。

二、我们用Airflow做些什么?

只要符合定时任务流的工作,都可以用Airflow来实现。我们主要用Airflow来实现定时的ETL处理。

三、为什么要用Airflow?

既然是主要使用其定时器的功能,为什么不直接写一个定时器而要用Airflow这种很占用资源的工具呢?Airflow除了可以很方便的组织任务流中每个环节的处理之外,还有一个非常重要的优势在于任务流的管理。特别是我们现在的ETL处理中,除了可能依赖外部的数据来源之外,还经常要调用模型的执行API(在IT层面也当成外部接口使用)。这些接口未必可控(比如外部数据来源,可能出现API服务掉线或者断网情况),当出现各种不可控问题导致任务流断裂的情况的时候,自己写的定时器就难以方便的、可视化的操作数据数据补回了。还有就是任务之间的依赖情况,在任务流的实现过程中,如果自己写定时器,这方面也需要有大量的考虑,而这些考虑Airflow已经帮你完成了。所以我们在使用Airflow的时候,也需要注意利用他在这两方面的优势,写出我们灵活,容错率高的工作流。毕竟没有bug的代码是难以实现的,每当写一段代码的时候,都要考虑这段代码会不会出bug,他所调用的方法和接口会不会出bug,当出bug的时候怎样弥补。【这也是写所有代码时候要思考的内容,有点啰嗦了】

四、基础配置airflow.cfg

安装好Airflow,第一次运行 airflow initdb 之后,会在Airflow文件夹下面产生一个airflow.cfg文件,这个就是基础配置文件。我们以这个基础文件作为模板来修改成为我们需要的配置文件。以下的操作都是找到对应的配置字段,修改其字段内容。

修改默认时区:default_timezone = Asia/Shanghai,说明:修改时区之后,Airflow前端页面仍旧会使用UTC时区显示,但是配合主机/容器的时区,这样我们在写dag任务执行时间的时候就不需要转换时区了。

修改执行器类型:executor = CeleryExecutor

不加载范例dag:load_example = False

不让同个dag并行操作:max_active_runs_per_dag = 1,说明:在ETL过程中,还是线性执行会比较好控制,如果里面需要批量操作,可以在ETL的具体处理过程中加入多线程或者多进程方式执行,不要在dag中体现

最高的dag并发数量:dag_concurrency = 16,说明:一般配置成服务器的CPU核数,默认16也没问题。

最高的任务并发数量:worker_concurrency = 16,说明:CeleryExecutor在Airflow的worker线程中执行的,这里配置的是启动多少个worker

数据库配置:sql_alchemy_conn = mysql://airflow:airflow@127.0.0.1:3306/airflow?charset=utf8,说明:我们一般是用MySQL来配合Airflow的运行

Celery Broker:broker_url = redis://:password@127.0.0.1:6379/0,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。

Celery Result backend:result_backend = redis://:password@127.0.0.1:6379/1,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。

五、MySQL需要注意的地方

mysql的配置中需要加入以下内容,不然执行会报错。需要在initdb之前加入并重启。

[mysqld]innodb_large_prefix = onexplicit_defaults_for_timestamp = 1六、运行

由于使用的是CeleryExecutor,需要顺序执行三个进程:airflow webserver -Dairflow scheduler -Dairflow worker -D

七、一些技巧分享7.1 利用provide_context在任务间传递信息

在default_args里面配置'provide_context': True,这样在每个任务执行完之后都可以返回一个信息(当你需要的时候)。这样每个任务都可以获取到之前任务执行返回的信息,以进行自身的处理操作。以下是一个简单的例子:

#-------------------------------------------------------------------------------# 任务1,获得数据并保存到文件中,返回文件名def job_get_datas(**kwargs): filename = get_datas() # 数据获取的函数,返回的是存储数据的文件名 return filenameoperator_get_datas = PythonOperator( task_id='task_get_datas', python_callable=job_get_datas, dag=dag) #-------------------------------------------------------------------------------# 把存储文件的数据导入数据库def job_data_2_mysql(**kwargs): filename = kwargs['task_instance'].xcom_pull(task_ids='task_get_datas') # 获取task_get_datas任务返回的数据 result = data_2_mysql(filename) # 数据入库的函数 return result operator_data_2_mysql = PythonOperator( task_id='task_data_2_mysql', python_callable=job_data_2_mysql, dag=dag)

注意:由于这里的上下文信息(任务返回的数据)是存到Airflow的MySQL中,字段长度有限,所以不推荐返回具体数据,而是通过其他途径存储临时数据(例如临时文件形式),返回关键信息(例如临时文件的文件名),这样既不会因为异常断开导致整个任务流需要重跑,也不会因为数据量过大导致Airflow存储MySQL的时候报错。

7.2 处理逻辑与任务流执行分离

虽然在dag里面可以直接写python代码(Airflow本身也是用python实现的),但是不推荐将处理逻辑写在dag上面。这里有两方面的考虑:

在Airflow的前端界面中,是可以看到dag的代码的,将处理逻辑、特别是数据库或其他服务的用户密码暴露出来未必是好事;

如果将逻辑写在dag里面,那么在测试逻辑的时候,就太依赖Airflow了。这与解耦的开发逻辑思路相违背了,我们是需要一个松耦合的代码世界。

那么推荐在项目下面添加一个etl_utils目录(或者你喜欢的名称),用于存放处理逻辑。这个目录下一般分成三个子目录config、etl、system,分别是配置信息(数据库密码等)、逻辑代码、通用工具(如封装好的es操作类)。那么一般项目的目录结构如下:

-/dag_xxx.py-/test_xxx.py-/etl_utls/-/etl_utls/config/...-/etl_utls/etl/...-/etl_utls/system/...

所有的文件之间的调用层级以根目录为起点。我们在实现逻辑之后,就可以在根目录下编写测试代码,按顺序执行我们需要实现的流程。按这种方式测试完流程之后再组织dag。

7.3 关于中间数据

在处理逻辑中,我们尽量将每个处理过程细分出来,每个处理完成之后都将数据保存到临时文件中(中间处理过程,一般不要存数据库了,加大数据库的存取压力不是一件好事情,而且这些都是临时的信息),这些文件可以是同一个文件进行反复覆盖(每个任务流都取一个相对唯一的文件名,例如使用uuid,或者第一次处理的时间戳,加上任务流名字作为唯一辨识)。千万不要将这些信息放在内存里,万一挂了,就找不回来了,又要整个流程重新跑过。

7.4 临时文件

临时文件,注意同个任务流中保持一致,但是在不同任务流中需要能区分,有时候上一个任务流失败了,下一个任务流继续执行,那么如果没有区分能力,就会把上一个任务流的数据给覆盖掉了。注意在最后加上一个删除文件的处理,减少系统空间压力。

7.5 关于处理频率

机器的处理能力总是有限的,所以我们在条件允许的情况下,每次处理的数据量尽量减小。一般减小每次处理的数据量的方法,就是增加处理频率。但是加大处理频率,又会加大Airflow自身运行需要占用的资源。所以需要在数据量和频率之间找到一个平衡,这里每个项目可能有自己的特点,需要在每个项目的实际情况中找到适合项目的处理频率。

python airflow_Airflow使用经验分享相关推荐

  1. Python培训教程分享:10款超好用的Python开发工具

    学会Python技术后,我们在参加工作的时候如果能有辅助工具的话,那么会很大程度的提高我们的工作效率,那么Python都有哪些好用的开发工具呢?下面小编就为大家详细的介绍一下10款超好用的Python ...

  2. Python培训教程分享:Python异常机制

    ​ 在学习Python技术的时候,我们经常会遇到一些异常,例如导致程序在运行过程中出现的中断或退出,我们都称之为异常,大多数的异常都不会被程序处理,而是以错误信息的形式展现出来.本期Python培训教 ...

  3. Python培训教程分享:有哪些值得使用的爬虫开源项目?

    相信很多同学在学习Python技术的时候,都有学习到Python爬虫技术,爬虫技术在各大互联网公司都是非常常见的,可以帮助我们获取各种网站的信息,比如微博.B站.知乎等,本篇Python培训教程分享为 ...

  4. Python培训教程分享:visual studio编写python怎么样?

    本期小编要为大家介绍的Python培训教程就是关于"visual studio编写python怎么样?"的问题,但答案当然是可以的,据了解,vs2017.vs2019都集成了pyt ...

  5. Python培训教程分享:Python中选择结构是什么

    越来越多的人开始报名学习Python技术,那么学习Python技术不是一两天就能学会的,本期小编为大家推荐的Python培训教程主要讲的是"Python中选择结构是什么",下面来看 ...

  6. Python培训教程分享:Python模块如何导入__all__属性?

    本期小编为大家带来的Python培训教程是关于"Python模块如何导入__all__属性?"的内容,后面在工作中是会遇到Python模块这个工作内容的,Python模块的开头通常 ...

  7. Python培训教程分享:如何实现pygame的初始化和退出操作?

    本期小编为大家介绍的Python培训教程是关于"如何实现pygame的初始化和退出操作?"的内容,pygame模块针对不同的开发需求提供了不同的子模块,例如显示模块.字体模块.混音 ...

  8. Python培训教程分享:“高效实用” 的Python工具库

    作为一名合格Python技术员,对于Python工具库的使用是少不了的,本期Python培训教程就为大家分享的是""高效实用" 的Python工具库",希望能够 ...

  9. 单相计量芯片RN8209D使用经验分享(转)

    单相计量芯片RN8209D使用经验分享 转载于:https://www.cnblogs.com/LittleTiger/p/10736060.html

最新文章

  1. clickhouse 基础知识
  2. 【c++】iostreeam中的类为何不可以直接定义一个无参对象呢
  3. 汽车之家APP车型配置--参数分析
  4. linux 日志监控工具,详解 Linux系统常用监控工具
  5. 如何准备启动敏捷-迭代0如何做?
  6. 微信小程序 - 文字收缩与展开
  7. 四、Web服务器——Session Cookie JSP入门 学习笔记
  8. [读书笔记] 美的历程
  9. 前沿 | 谷歌用深度学习进行深度预测
  10. Typora本地图片上传
  11. DataBseDesign工作笔记003---ERStudio使用笔记_基本使用方法详解
  12. 看到这个我冷汗直冒,还好不是指我……
  13. java获取当月1号 的时间chuo_java获取时间戳的方法
  14. DirectSound学习笔记(4):设备性能
  15. 几个负载均衡软件比较(Haproxy vs LVS vs Nginx)
  16. 八天学会Ansys命令流
  17. 单因素模糊评价matlab,matlab学习系列21模糊综合评价.docx
  18. dateutil 日期计算_时间工具——DateUtil
  19. jquery boxy
  20. android 自定义太阳,第一个AOSP安卓10自定义ROM已经可用,并且非常稳定

热门文章

  1. 修改Chrome谷歌浏览器默认安装路径
  2. 上投摩根灵魂人物吕俊辞职
  3. python多维数组拟合_python归一化多维数组的方法
  4. 预警短信python_zabbix 利用python脚本实现短信告警
  5. uwb定位技术-工厂人员定位首选
  6. HTML5动画如何最高效地制作?
  7. JAVA继承的综合案例——群主发普通红包
  8. 计算机专业名称bios翻译,BIOS中各英文的意思是什么?BIOS英文全翻译
  9. AT+CSQ信号质量指示含义
  10. learning at command AT+CSQ