airflow配置信息

  • Airflow 1.10+安装
  • 整体流程
    • 建库、建用户
    • Airflow安装
    • airflow 配置
      • 1. 安装Mysql模块
      • 2. 用户认证
      • 3. 配置邮件服务
      • 4、配置Executor
      • 5. 修改log地址
      • 6. 修改webserver地址
      • 7. 可选配置
      • 8.celery配置信息
      • 9.scheduler配置信息
  • 调度程序尝试触发新任务的时间
    • 运行airflow
    • 安装问题汇总
    • 配置任务

Airflow 1.10+安装

本次安装Airflow版本为1.10+,其需要依赖Python和DB,本次选择的DB为Mysql。
本次安装组件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7

整体流程

  1. 建表
  2. 安装
  3. 配置
  4. 运行
  5. 配置任务
启动schedule
airflow scheduler -D
启动webserver
airflow webserver -Dps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}

建库、建用户

库名为airflow
‘create database airflow;’
建用户

用户名为airflow,并且设置所有ip均可以访问。

create user ‘airflow’@’%’ identified by ‘airflow’;
create user ‘airflow’@‘localhost’ identified by ‘airflow’;

用户授权
这里为新建的airflow用户授予airflow库的所有权限

grant all on airflow.* to 'airflow'@'%';
flush privileges

Airflow安装

这里通过 virtualenv 进行安装。

----- 通过virtualenv安装

$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录
$ virtualenv --no-site-packages airflow --python=python # 创建虚拟环境
$ source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境

----- 安装指定版本或者默认
$ pip install apache-airflow -i https://pypi.douban.com/simple
在安装完一堆的依赖后,就需要配置 AIRFLOW_HOME 环境变量,后续的 DAG 和 Plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。

报错
ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you’ll have jinja2 2.10 which is incompatible.
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you’ll have werkzeug 0.14.1 which is incompatible.

执行:pip3 install -U Flask==1.0.4
执行:pip3 install -U pika==0.13.1

重新执行 :pip install apache-airflow -i https://pypi.douban.com/simple

----- 设置环境变量

(airflow) $ export AIRFLOW_HOME=/tmp/airflow

----- 查看其版本信息
(airflow) $ airflow version


____ |( )_______ / /________ __
____ /| |_ /__ / / __ / __ _ | /| / /
___ ___ | / _ / _ / _ / / // / |/ |/ /
// |// // // // _
/____/|__/
v1.8.0
执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。

airflow 配置

----- 修改Airflow DB配置

1. 安装Mysql模块

pip install “apache-airflow[mysql]”
这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。

修改Airflow DB配置
修改${AIRFLOW_HOME}/airflow.cfg

sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
参数的格式为mysql://帐号:密码@ip:port/db

初始化db
新建airflow依赖的表。

airflow initdb如报错 Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)
需改sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow

2. 用户认证

本文采用的用户认证方式为password方式,其他方式如LDAP同样支持但是本文不会介绍。笔者在安装时实验过LDAP方式但是未成功过。

安装passsword组件
pip install "apache-airflow[password]"
2. 修改 airflow.cfg[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
3. 在python环境中执行如下代码以添加账户:import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'  # 用户名
user.email = 'emailExample@163.com' # 用户邮箱
user.password = 'password'   # 用户密码
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

3. 配置邮件服务

此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com
接下来简单把dag的Python代码列出来,以供参考:default_args = {'owner': 'ownerExample','start_date': datetime(2018, 9, 18),'email': ['mailReceiver@163.com'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。'email_on_failure': ['mailReceiver@163.com'], # 任务失败且重试次数用完时发送Email。'email_on_retry': True, # 任务重试时是否发送Email'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。'retries': 3,'retry_delay': timedelta(minutes=3),
}

4、配置Executor

设置Executor
修改:airflow.cfg

executor = LocalExecutor
本文中由于只有单节点所以使用的是LocalExecutor模式。

5. 修改log地址

[core]
# dags存放路径
dags_folder = /home/xxx/airflow/dags
# 获取服务器IP的方式
hostname_callable = socket.getfqdn
# 时区,可以是UTC(默认),也可以换成国内Asia/Shanghai
default_timezone = utc
# airflow支持并行性的工作器,有`SequentialExecutor`(默认,顺序执行), `LocalExecutor`(本地执行), `CeleryExecutor`(远程执行), `DaskExecutor`
executor = SequentialExecutor
# 数据库连接设置
sql_alchemy_conn = sqlite:////home/xxx/airflow/airflow.db
# 数据库编码方式
sql_engine_encoding = utf-8
# 是否与SqlAlchemy库进行数据交互
sql_alchemy_pool_enabled = True
# 最大数据库连接数
sql_alchemy_pool_size = 5
# 控制每个Airflow worker可以同时运行task实例的数量
parallelism = 32
# 用来控制每个dag运行过程中最大可同时运行的task实例数,若DAG中没有设置concurrency,则使用默认值
dag_concurrency = 16
# 创建新的DAG时,是否暂停
dags_are_paused_at_creation = True
# 同一时间最大运行dag的数量,默认为16
max_active_runs_per_dag = 16
# 加载示例dags,默认为True
load_examples = True[scheduler]
child_process_log_directory = servers/logs/airflow/scheduler

6. 修改webserver地址

修改webserver地址
[webserver]
base_url = http://host:port
可以通过上面配置的地址访问webserver。

7. 可选配置

(可选)修改Scheduler线程数
如果调度任务不多的话可以把线程数调小,默认为32。参数为:parallelism(可选)不加载example dag
如果不想加载示例dag可以把load_examples配置改为False,默认为True。这个配置只有在第一次启动airflow之前设置才有效。如果此方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。(可选)修改检测新dag间隔
修改min_file_process_interval参数为10,每10s识别一次新的dag。默认为0,没有时间间隔。

8.celery配置信息

#  配置celery的broker_url(存储要执行的命令然后celery的worker去消费)
broker_url = redis://redis:6379/0
# 配置celery的result_backend(存储任务执行状态)、 也可以用redis存储
result_backend = db+postgresql://postgres:airflow@postgres/airflow

9.scheduler配置信息

调度程序尝试触发新任务的时间

scheduler_heartbeat_sec = 60
# 检测新dag的时间
min_file_process_interval = 10
# 是否使用catchup功能, 即是否执行自上次execute_date以来所有未执行的DAG Run, 另外定义每个DAG对象可传递catchup参数进行覆盖
catchup_by_default = True

运行airflow

启动schedule
airflow scheduler -D
启动webserver
airflow webserver -D

安装问题汇总

1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql修改Mysql配置文件my.cnf,具体步骤如下:查找my.cnf文件位置
mysql --help | grep my.cnf
下图红框处为my.cnf文件所在位置:修改文件
explicit_defaults_for_timestamp=true
注意:必须写在【mysqld】下重启Mysql
sudo systemctl restart mysqld.service
查看修改是否生效。执行如下SQL,如果值为1则为生效。2. pip install "apache-airflow[mysql]"报错:mysql_config not found安装mysql-devel:首先查看是否有mysql_config文件。
find / -name mysql_config如果没有安装mysql-devel
yum install mysql-devel

配置任务

在 AirFlow 中,每个节点都是一个任务,可以是一条命令行 (BashOperator),可以是一段 Python 脚本 (PythonOperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 DAG 。

默认会到 ${AIRFLOW_HOME}/dags 目录下查找,可以直接在该目录下创建相应的文件。

如下是一个简单的示例。

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, datetime
import pytz# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initializationdefault_args = {'owner': 'qxy','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)dag = DAG('airflow_interval_test',default_args=default_args,description='airflow_interval_test',schedule_interval='35 17 * * *',start_date=utc_dt)t1 = BashOperator(task_id='sleep',bash_command='sleep 5',dag=dag)t2 = BashOperator(task_id='print_date',bash_command='date',dag=dag)t1 >> t2

该文件创建一个简单的 DAG,只有三个运算符,两个 BaseOperator ,也就是执行 Bash 命令分别打印日期以及休眠 5 秒;另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。
文件创建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自动读取该DAG。

----- 测试是否正常,如果无报错那么就说明正常

$ python /tmp/project/dags/hello_world.py

airflow配置相关相关推荐

  1. Linux网络属性配置相关命令

    Linux网络属性配置相关命令: 前言: Linux属性配置可以分为两类.一类通过命令配置,另一类通过修改配置文件配置. Linux属性配置的相关命令可以分为三大类: 一.ifcfg命令家族:①ifc ...

  2. memcached的基本命令(安装、卸载、启动、配置相关)

    memcached的基本命令(安装.卸载.启动.配置相关): -p 监听的端口  -l 连接的IP地址, 默认是本机   -d start 启动memcached服务  -d restart 重起me ...

  3. Docker部署配置相关使用总结

    Docker部署配置相关使用总结 创建并启动容器 使用 docker run 命令来创建并启动一个容器: $ docker run -it centos /bin/echo 'hello world' ...

  4. 计算机的相关配置信息,win7系统查看电脑配置相关信息的方案介绍

    win7系统使用久了,好多网友反馈说win7系统查看电脑配置相关信息的问题,非常不方便.有什么办法可以永久解决win7系统查看电脑配置相关信息的问题,面对win7系统查看电脑配置相关信息的图文步骤非常 ...

  5. nodeJs配置相关以及JSON.parse

    nodeJs配置相关 实际上说应用相关更好吧,我不是很懂. 今天在工作中,被同事解决了一个问题,虽然多花了一些额外时间,但长痛不如短痛嘛 实际上的问题就是npm run target等命令可以,但是n ...

  6. GBrowse配置相关资料

    GBrowse配置相关资料(形状.颜色.配置.gff3) http://gmod.org/wiki/Glyphs_and_Glyph_Options http://gmod.org/wiki/GBro ...

  7. Nginx配置相关结构划分的技巧和禁止IP访问

    Nginx配置相关结构划分的技巧 原文来自:http://developer.51cto.com/art/201003/190953.htm Nginx配置需要一定的技巧,我们在不断的使用和维护中就会 ...

  8. 【树莓派】服务配置相关3:基于Ubuntu Server的服务配置

    该文接续之前写过的两篇: [树莓派]服务配置相关 [树莓派]服务配置相关2:基于RPi Desktop的服务配置 这是我个人用来进行树莓派盒子安装配置的脚本,对于外部其他博友,可以部分参考,但不需要逐 ...

  9. centOS中网络配置相关文件配置选项说明

    2019独角兽企业重金招聘Python工程师标准>>> CentOS 修改IP地址等网络相关的配置文件 找到对应网卡的IP地址配置文件:ifcfg-eth数字. 路径:/etc/sy ...

最新文章

  1. CSS中的四种样式及选择器
  2. python中安装一个第三方库的命令格式是-无法使用pip命令安装python第三方库的彻底解决方案...
  3. RSS FEED的应用
  4. 安装配置Kali虚拟机----linux----kali
  5. JVM执行篇:使用HSDIS插件分析JVM代码执行细节--转
  6. javascript 模板引擎基本原理
  7. Python之数据分析(Numpy的数组切片、数组变维、组合与拆分)
  8. java sender_Spring Boot用JavaMailSender发送邮件方法
  9. ecshop始终显示全部分类
  10. xp配置iis和php,XP下让IIS支持PHP
  11. python异步IO编程(一)
  12. SpringBoot常见面试题总结二
  13. Win10右键新建中没有新建文件夹,电脑右键新建文件夹不见了
  14. word文件转pdf转换器11.0注册码
  15. 网络协议之NAT穿透原理
  16. 基于android手机实时监控ipcam视频之三:H.264的RTP打包解析
  17. 《抽样技术》第4章 等概率整群抽样和多阶段抽样
  18. define不是c语言语句,define是语句
  19. uniapp 打包安卓 Android 抖音app 前端篇~01
  20. Android studio小能手之色卡对照表

热门文章

  1. 运维 --- Nginx介绍和在CentOS/Redhat下安装
  2. TemporalAdjusters.previous
  3. bdp mysql支持_bdp个人版到底是什么神器?
  4. 企业和个人怎么做软文投放?软文投放需要注意什么
  5. fill和memset函数(C++)
  6. NB-Iot烟感07:NB-IOT 无线通讯程序开发
  7. Hive学习(15)-Hive分析窗口函数(三)
  8. 基于FRR全面解析BGP协议(五):FRR的BGP路由策略
  9. jar中没有主清单属性 bug解决方法
  10. Python xlrd库的使用示例