【AIRFLOW】CROSS DAG 实例
Cross DAG (暂时没有时间详细解释里面的内容)
ExternalTaskSensor
ExternalTaskMarker
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external.html
实际例子:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 9),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('dag_task1',default_args=default_args , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",# external_dag_id="dag_task2",# external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="dag_task2",external_task_id="task2_2",mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2
官网解释:https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/sensors/external_task_sensor/index.html
ExternalTaskSensor 》 execution_delta 实例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 9),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('dag_task1',default_args=default_args , schedule_interval='*/10 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",# external_dag_id="dag_task2",# external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="dag_task2",external_task_id="task2_2",execution_delta=timedelta(minutes=8),mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2
ExternalTaskSensor 》 execution_date_fn 实例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 10),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('sensor_dag_task1',default_args=default_args , schedule_interval='*/10 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",# external_dag_id="dag_task2",# external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="sensor_dag_task2",external_task_id="task2_2",execution_date_fn=lambda dt: [dt + timedelta(minutes=-i) for i in range(0,10,2)],timeout=10,mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('sensor_dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="sensor_dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2
参考:https://luminousmen.com/post/airflow-dag-dependencies
【AIRFLOW】CROSS DAG 实例相关推荐
- 关于Airflow跨DAG依赖总结
关于Airflow跨DAG依赖总结 单个DAG中Task之间的依赖 这是最常见的Task之间的依赖,在DAG中有多种方式指定依赖关系 # 定义DAGdag = DAG( ... ) # 定义task_ ...
- AirFlow官方入门DAG示例
经过前两篇文章的简单介绍之后,我们安装了自己的AirFlow以及简单了解了DAG的定义文件.现在我们要实现自己的一个DAG. 1. 启动Web服务器 使用如下命令启用: airflow webserv ...
- Airflow DAG声明的3种方式
先说明一下我使用的airflow 2.2.4版本 第一种使用标准构造函数,将dag通过参数传递进去 import pendulum from airflow import DAG from airfl ...
- Python笔记 · Airflow中的DAG与With语法
在<Python笔记 · With语法糖>这篇文章中我们提到: 在Airflow中通过With构建DAG时,不必显示地将Operator添加到DAG中,只要是在With语句块内声明的Ope ...
- airflow零基础入门
Airflow 入门 简介 Airflow是什么 Airflow是airbnb开发的一个任务调度平台,目前已经加入apache基金会 Airflow有什么用 Airflow是一个可编程,调度和监控的工 ...
- bitnami如何使用_使用Bitnami获取完全配置的Apache Airflow Docker开发堆栈
bitnami如何使用 I've been using it for around 2 years now to build out custom workflow interfaces, like ...
- airflow mysql_Airflow 使用及原理分析
Airflow 入门及使用 什么是 Airflow?Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台. Airflow 是通过 DAG(Dire ...
- Airflow 中文文档:常见问题
为什么我的任务没有安排好? 您的任务可能无法安排的原因有很多. 以下是一些常见原因: 您的脚本是否"编译",Airflow引擎是否可以解析它并找到您的DAG对象. 要对此进行测试, ...
- Airflow 中文文档:调度和触发器
Airflow调度程序监视所有任务和所有DAG,并触发已满足其依赖关系的任务实例. 在幕后,它监视并与其可能包含的所有DAG对象的文件夹保持同步,并定期(每分钟左右)检查活动任务以查看是否可以触发它们 ...
最新文章
- 【每日一算法】二叉树的最小深度
- 安装完Arch后,要安装的软件
- Javascript Symbol 隐匿的未来之星
- python中bytes转int的实例
- Docker ASP.NET Core (5):Docker Compose
- 【推荐】技术人必看的音视频学习资源清单
- vue 动态设置背景图片
- 用python写web网页_从零开始,使用python快速开发web站点(1) | 学步园
- Write a simple HTML Browser(hdu1088)
- Eclipse中Hibernate插件的安装
- maven远程私服发布jar包
- High Score
- 百问网7天物联网智能家居 学习心得 打卡第七天
- keil编译后显示的Program size含义
- CVE-2022-1388 BIG-IP_POC-YAML
- 机器学习——大规模机器学习
- 推荐几款不错的企业站点,前端设计师寻求设计灵感!
- 人工智能——“kmeans实现图片分割”(Python实现)
- 如何自己制作PCB板(单面板)入门级完全教程
- 《海思Hi35xx开发日记——之No.1》
热门文章
- 修改阿里云docker镜像源
- 基于JavaWeb的保护动物管理系统设计与实现
- 大连出入境管理处办理护照流程
- 做IT的到底是白领还是蓝领?
- python 网页重定向_小试牛刀:python爬虫爬取springer开放电子书.
- swing java_Java Swing 介绍
- 汉源高科万兆1光1电光纤收发器10G万兆光纤收发器万兆网络收发器光端机
- 实验6 单个交换机虚拟局域网
- Numpy中 random.rand() 和random.normal() 的用法
- AndroidStudio 编译中遇到问题总结