需求:外界传入参数,PythonOperator 根据参数查询数据库得出结果, BashOperator 根据pythonOperator查询的结果当作参数去打包下载HDFS 的 文件。

分析:

(1)provide_context=True : 外界传参数,需要使用PythonOperator 的provide_context=True 来获取外界参数。 provide_context 默认为False, 即不会将产生的数据放入上下问中。

(2) 将provide_context 设置为True后, 在自定义方法的参数中注入 **kwargs ,从**kwargs 中获取外界传来的数据;如在传入的json 数据中kwargs['dag_run'].conf['date_end’] ,获取date_end

(3)在getMacLists 方法中,我使用了SQL 底层的查询方法JdbcHook.get_hook, 获取数据库操作链接。以dateEnd、dateStart、getMacLists方法均以return 方式,返回的值会放入 Xcom 中。放入 Xcom 中 值可供其他Operator 的获取。

(4)数据库连接池 可以在UI 界面的Adminitor -》 Connection 中配置

(5)多看源码

#-*- coding: utf-8 -*-import airflowfrom airflow import DAGfrom datetime import datetime, timedeltafrom airflow.hooks.jdbc_hook import JdbcHookfrom airflow.operators.python_operator import PythonOperatorfrom airflow.operators.bash_operator import BashOperatordefault_args = {'owner': 'airflow','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(0),'retries': 1,'retry_delay': timedelta(minutes=5),}dag = DAG('tarHdfs',default_args=default_args,description='Tar Hdfs File to Local',schedule_interval="@once",)#注入 **kwargs ,从**kwargs 中获取外界传来的数据;def dateEnd(ds, **kwargs):return kwargs['dag_run'].conf['date_end']def dateStart(ds, **kwargs):return kwargs['dag_run'].conf['date_start']def getMacLists(ds, **kwargs):businessid = kwargs['dag_run'].conf['businessid']shopid = kwargs['dag_run'].conf['shopid']mac = kwargs['dag_run'].conf['mac']#businessid = 104#shopid=""#mac=""sql = ""if(mac):return mac + ","elif(shopid):sql= "select mac from device_d where shop_key = " + shopidelif(businessid):sql= "select mac from device_d where business_key = " + businessidelse:passc = JdbcHook.get_hook('mysql_default')rows = c.get_records(sql)result = ""for x in rows:result = result + x[0] + ","return resultt1= PythonOperator(task_id='dateStart',provide_context=True,python_callable=dateStart,dag=dag)t2= PythonOperator(task_id='dateEnd',provide_context=True,python_callable=dateEnd,dag=dag)t3= PythonOperator(task_id='getMacLists',provide_context=True,python_callable=getMacLists,dag=dag)t4 = BashOperator(task_id='jdbc',bash_command="/usr/local/airflow/test/script/rq.sh {{ task_instance.xcom_pull(task_ids='getMacLists') }} {{ task_instance.xcom_pull(task_ids='dateStart') }} {{ task_instance.xcom_pull(task_ids='dateEnd') }} ",dag=dag,)[t1,t2,t3] >> t4

Web 形式触发

curl -X POST \http://cdh3:4321/api/experimental/dags/tarHdfs/dag_runs \-H 'Content-Type: application/json' \-d '{"conf": "{\"date_start\": \"20181122\",\"businessid\": \"\" ,\"shopid\": \"\" ,\"mac\": \"00-1F-7B-00-1A-05\" , \"date_end\": \"20181130\"}" }'

/usr/local/airflow/test/script/rq.sh:

# !/usr/bin/pythonmacs=$1startDate=$2endDate=$3for mactmp in ${macs//,/ };doecho ${mactmp%,*}hdfs dfs -get /test/2018 /data/xchgtags/devicelog/donetar -zcvf logs.tar.gz devicelog

PythonOperator、BashOperator以及Xcom使用相关推荐

  1. XCOM串口助手打印不出数据

    本次实验是在基于原子的战舰开发板上的做定时器捕获实验,程序源码下载到板子上运行正常.指示灯正常显示,打开XCOM识别不来串口,原因:硬件上没有插USB转串口线: 连接上USB转串口线,软件上以显示CH ...

  2. xcom2.0_发布Xcom 2,Elliot Quest,Mesa图形库以及更多开放式游戏新闻

    xcom2.0 您好,开放游戏迷! 在本周的版本中,我们来看看Xcom 2,Elliot Quest,Mesa图形库等的发布. 2016年2月6日至12日每周开放的游戏综述 <冰雪奇缘> ...

  3. VSPD+XCOM+Proteus仿真stm32串口通信

    本文说明 功能要求 开机后,向串口1发送"Welcome" 串口1接收字节指令"0xA1",打开LED1,回传"LED1 ON" 串口1接收 ...

  4. XCOM V2.6 串口打印出来中文乱码,字体格式错乱的一种原因(设置没问题,突然乱码)

    一.问题重现 XCOM V2.6里,完全一样的设置,之前显示的好好的,后来突然中文出现乱码,字体格式也大变,如下图: 二.解决方案 一般这种乱码,往往都是因为波特率等参数设置错误,这类错误网上均有相应 ...

  5. c语言天选之点,【WOTC】天选者之战修改XCOM能力点数方法 意志力恢复加速 更新一个战斗中不掉意志力修改 再更新一个战斗视角缩放增大并固定修改...

    查看: 36591|回复: 60 [攻略] [WOTC]天选者之战修改XCOM能力点数方法 意志力恢复加速 更新一个战斗中不掉意志力修改 再更新一个战斗视角缩放增大并固定修改 游戏狂人, 积分 184 ...

  6. 解决XCOM 工具乱码的问题

    如果XCOM 发出的数据在其他平台上解析出乱码,可能由以下两个原因引起 1:发送数据未勾选16进制发送的按钮导致发送的数据未ASCII码的数据. 2:接收单元的buffer数据未使用unsigned ...

  7. XCOM(串口监视器,无单片机)+ESP8266显示心知天气天气信息

    XCOM(串口监视器,无单片机)+ESP8266显示心知天气天气信息 ESP8266 AT指令显示 这是第一次写博客,写的内容尽量通俗易懂贴近生活. PS:写的不好务必不要打我. ESP8266 一款 ...

  8. 第11次博文;有关下载XCOM串口助手链接

    访问CSDNUP主,链接: (88条消息) XCOM V2.6串口助手_mshlc0728的博客-CSDN博客 免费即可下载,本人只是引流.

  9. 关于串口调试助手XCOM点击发送后卡住问题

    未成功安装CH340驱动(USB串口驱动) 安装前先重启电脑,再点击安装 串口选择错误 打开设备管理器,查看USB连接的端口(COM)号,选择正确的端口(COM)号 波特率.停止位.数据位.奇偶校验位 ...

  10. HDU-4536 XCOM Enemy Unknown 枚举

    题意:有N个国家,每个国家属于一个洲,现在有人要来攻击一些国家,每次攻击选择来自三个不同洲的国家,我们能够选择去保护一个国家,被保护的国家恐惧值-2,其余两个国家恐惧值+2,和这两个国家在一个洲的国家 ...

最新文章

  1. Linux系统 shell基础(二)
  2. ACCP学习旅程之-----使用C#开发数据库应用程序(第二章)
  3. boost::edge_connectivity用法的测试程序
  4. 读书笔记:非营利组织的管理
  5. 两年内赚到60万美元?走近以色列在线攻击服务vDOS
  6. QQ空间自动点赞代码
  7. CentOS修改tomcat端口
  8. 综合管廊:道路工程综合管廊施工方案(图文)
  9. linux进程操作日志文件,我使用过的Linux命令之tailf - 跟踪日志文件/更好的tail -f版本...
  10. WebService系列之HttpClient调用WebService接口
  11. Android实现侧滑recycleView+CardVeiw卡片阴影效果
  12. RAC-iOS中基本用法
  13. 百度地图api周边搜索功能,用单选按钮切换搜索类型
  14. 表删除时 Cannot delete or update a parent row: a foreign key constraint fails 异常处理
  15. 国网项目汇总(ECP)
  16. fidder配合夜神模拟器进行抓包
  17. ELO用户忠诚度评分大赛---异常值的识别和处理
  18. RecyclerView的好朋友 — SnapHelpter
  19. 每次回西安,都会动摇我继续留在深圳的决心
  20. KgoUI(1) 之 技术选型angular 和 vue

热门文章

  1. ubuntu中修改只读文件
  2. mini-smooth-signature.js:小程序canvas带笔锋手写签名,支持微信/支付宝/钉钉/QQ等多平台小程序使用
  3. VsCode+PHP开发 推荐插件
  4. 朋友之间最舒服的关系
  5. 闲置手机不要换锅换盆,你会后悔的
  6. 年会抽奖程序:200行HTML+JavaScript写个桌面程序
  7. SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷
  8. 经典点云配准算法:迭代最近点算法ICP(Iterative Closest Point)
  9. Annoying Present CodeForces - 1009C
  10. 如何查找计算机中的视频文件,win7系统快速搜索查找电脑里的视频文件的操作方法...