Cross DAG (暂时没有时间详细解释里面的内容)

ExternalTaskSensor
ExternalTaskMarker

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external.html

实际例子:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 9),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('dag_task1',default_args=default_args , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",#                              external_dag_id="dag_task2",#                              external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="dag_task2",external_task_id="task2_2",mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2
官网解释:https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/sensors/external_task_sensor/index.html
ExternalTaskSensor 》 execution_delta 实例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 9),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('dag_task1',default_args=default_args , schedule_interval='*/10 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",#                              external_dag_id="dag_task2",#                              external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="dag_task2",external_task_id="task2_2",execution_delta=timedelta(minutes=8),mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2

ExternalTaskSensor 》 execution_date_fn  实例:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensordefault_args = {'owner': 'Zsigner','depends_on_past': False,'start_date': datetime(2020, 11, 10),'retries': 1,'retry_delay': timedelta(minutes=1),
}
start_date = datetime(2020,11,9)
# with DAG('dag_task1',start_date=start_date , schedule_interval='*/1 * * * *',catchup = False) as dag_task1:
with DAG('sensor_dag_task1',default_args=default_args , schedule_interval='*/10 * * * *',catchup = False) as dag_task1:# task1_1 = ExternalTaskMarker(task_id="task1_1",#                              external_dag_id="dag_task2",#                              external_task_id="task2")task1_1 = ExternalTaskSensor(task_id="task1_1",external_dag_id="sensor_dag_task2",external_task_id="task2_2",execution_date_fn=lambda dt: [dt + timedelta(minutes=-i) for i in range(0,10,2)],timeout=10,mode="reschedule")bash_command_task1= """echo task1 running"""task1 = BashOperator(task_id='task1',bash_command=bash_command_task1,dag=dag_task1)task1_1 >> task1# with DAG('dag_task2', start_date=start_date, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:
with DAG('sensor_dag_task2', default_args=default_args, schedule_interval='*/2 * * * *',catchup = False) as dag_task2:task2_2 = ExternalTaskMarker(task_id="task2_2",external_dag_id="sensor_dag_task1",external_task_id="task1_1")bash_command_task2= """echo task2 running"""task2 = BashOperator(task_id='task2',bash_command=bash_command_task2,dag=dag_task2)task2 >> task2_2

参考:https://luminousmen.com/post/airflow-dag-dependencies

【AIRFLOW】CROSS DAG 实例相关推荐

  1. 关于Airflow跨DAG依赖总结

    关于Airflow跨DAG依赖总结 单个DAG中Task之间的依赖 这是最常见的Task之间的依赖,在DAG中有多种方式指定依赖关系 # 定义DAGdag = DAG( ... ) # 定义task_ ...

  2. AirFlow官方入门DAG示例

    经过前两篇文章的简单介绍之后,我们安装了自己的AirFlow以及简单了解了DAG的定义文件.现在我们要实现自己的一个DAG. 1. 启动Web服务器 使用如下命令启用: airflow webserv ...

  3. Airflow DAG声明的3种方式

    先说明一下我使用的airflow 2.2.4版本 第一种使用标准构造函数,将dag通过参数传递进去 import pendulum from airflow import DAG from airfl ...

  4. Python笔记 · Airflow中的DAG与With语法

    在<Python笔记 · With语法糖>这篇文章中我们提到: 在Airflow中通过With构建DAG时,不必显示地将Operator添加到DAG中,只要是在With语句块内声明的Ope ...

  5. airflow零基础入门

    Airflow 入门 简介 Airflow是什么 Airflow是airbnb开发的一个任务调度平台,目前已经加入apache基金会 Airflow有什么用 Airflow是一个可编程,调度和监控的工 ...

  6. bitnami如何使用_使用Bitnami获取完全配置的Apache Airflow Docker开发堆栈

    bitnami如何使用 I've been using it for around 2 years now to build out custom workflow interfaces, like ...

  7. airflow mysql_Airflow 使用及原理分析

    Airflow 入门及使用 什么是 Airflow?Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台. Airflow 是通过 DAG(Dire ...

  8. Airflow 中文文档:常见问题

    为什么我的任务没有安排好? 您的任务可能无法安排的原因有很多. 以下是一些常见原因: 您的脚本是否"编译",Airflow引擎是否可以解析它并找到您的DAG对象. 要对此进行测试, ...

  9. Airflow 中文文档:调度和触发器

    Airflow调度程序监视所有任务和所有DAG,并触发已满足其依赖关系的任务实例. 在幕后,它监视并与其可能包含的所有DAG对象的文件夹保持同步,并定期(每分钟左右)检查活动任务以查看是否可以触发它们 ...

最新文章

  1. 【每日一算法】二叉树的最小深度
  2. 安装完Arch后,要安装的软件
  3. Javascript Symbol 隐匿的未来之星
  4. python中bytes转int的实例
  5. Docker ASP.NET Core (5):Docker Compose
  6. 【推荐】技术人必看的音视频学习资源清单
  7. vue 动态设置背景图片
  8. 用python写web网页_从零开始,使用python快速开发web站点(1) | 学步园
  9. Write a simple HTML Browser(hdu1088)
  10. Eclipse中Hibernate插件的安装
  11. maven远程私服发布jar包
  12. High Score
  13. 百问网7天物联网智能家居 学习心得 打卡第七天
  14. keil编译后显示的Program size含义
  15. CVE-2022-1388 BIG-IP_POC-YAML
  16. 机器学习——大规模机器学习
  17. 推荐几款不错的企业站点,前端设计师寻求设计灵感!
  18. 人工智能——“kmeans实现图片分割”(Python实现)
  19. 如何自己制作PCB板(单面板)入门级完全教程
  20. 《海思Hi35xx开发日记——之No.1》

热门文章

  1. 修改阿里云docker镜像源
  2. 基于JavaWeb的保护动物管理系统设计与实现
  3. 大连出入境管理处办理护照流程
  4. 做IT的到底是白领还是蓝领?
  5. python 网页重定向_小试牛刀:python爬虫爬取springer开放电子书.
  6. swing java_Java Swing 介绍
  7. 汉源高科万兆1光1电光纤收发器10G万兆光纤收发器万兆网络收发器光端机
  8. 实验6 单个交换机虚拟局域网
  9. Numpy中 random.rand() 和random.normal() 的用法
  10. AndroidStudio 编译中遇到问题总结