关于 Airflow 的介绍,请参考 《ETL:ETL的基本概念》 中 5.3. Airflow 一节的内容。
1. 安装配置
1.1. 安装步骤
sudo apt update
sudo apt install python3-pip
pip3 install --upgrade pip
1.2. 配置文件安装路径及环境变量
export AIRFLOW_HOME=~/airflow
pip3 install apache-airflow
export AIRFLOW_HOME=/home/work/airflow
echo 'export PATH=/usr/local/bin:$PATH' >>~/.bashrc
source ~/.bashrc
1.3. 初始化 airflow
airflow db init
1.4. 配置 webserver
vim /home/work/airflow/airflow.cfg
- 修改 webserver 配置
[webserver] security = Flask AppBuilder secure_mode = True rbac=True
1.5. 重置 airflow
airflow db reset
1.6. 创建账号密码
airflow users create --lastname user --firstname admin --username name --email xxx@xxx.com --role Admin --password password
2. 启动访问
2.1. 启动服务
airflow webserver --port 8080 --hostname 0.0.0.0 # 注意:ubuntu如果有开启防火墙,则需要允许端口8080通过
2.2. web 访问
http://xxx.xxx.xxx.xxx:8080/home
账号:xxxx
密码:xxxx
3. DAGS
3.1. DAG 概念简介
- dags 是用 python 编写的任务调度文件,用于给 airflow 要执行的 task 做任务计划。
3.2. 文件位置
- dags 文件保存在该目录下
/home/work/airflow/dags
3.3. DAGS 样例文件
-
sample.py
# coding: utf-8 from airflow import DAG # from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'admin', 'depends_on_past': False, 'start_date': datetime(2022, 3, 24), # 'email': ['xxx@xxx.com'], # 'email_on_failure': True, # 'email_on_retry': True, 'retries': 3, 'retry_delay': timedelta(seconds=5) } dag = DAG( dag_id='dws_delivery_reach_result_test', default_args=default_args, # schedule_interval="00, *, *, *, *" schedule_interval=timedelta(minutes=1) ) first_task = BashOperator( task_id='dws_delivery_reach_result_test_1', depends_on_past=False, bash_command='python /home/work/airflow/tasks/xxxxx.py', dag=dag ) second_task = BashOperator( task_id='dws_delivery_reach_result_test_2', depends_on_past=False, bash_command='python /home/work/airflow/tasks/xxxxx.py', dag=dag ) first_task >> second_task
3.4. 执行 dags 文件
python sample.py
如果不报错就是执行成功。
#、3.5. 检查 dags 是否生效
airflow dags list
如果 paused
值为 None
,则表示 scheduler 中并未引入 dag。
ag_id | filepath | owner | paused
=================================+==========+=======+=======
dws_delivery_reach_result_test_1 | test.py | admin | False
如果 paused
值为 True
,则表示 dags 生效,在webUI中可以查看到该 dag。
dag_id | filepath | owner | paused
=================================+==========+=======+=======
dws_delivery_reach_result_test_1 | test.py | admin | True
4. TASKS
4.1. 概念简介
- task 是用来做数据清洗同步的具体任务,内容是用 python 编写的 SQL 相关语句,数据清洗工作在此处完成;
4.2. 文件位置
-
task 文件存放在该目录下:
/home/work/airflow/tasks/
4.3. dags 调度任务
注意:
- 使用 BashOperator,以命令行的形式执行
python /home/work/airflow/tasks/xxxxx.py
;
4.4. 频率设置
以下两种方式二选一:
- 定时任务:
schedule_interval="00, *, *, *, *"
- 时间间隔
schedule_interval=timedelta(minutes=1)
5. 注意事项
5.1. dags 文件禁止汉字
- dag 文件中不能出现汉字,否则 dags 不会生效。
5.2. 注意 cfg 文件中默认时间
default_timezone = Asia/Shanghai
5.3. 推荐使用 BashOperator,不要使用 PythonOperator。
- 尽量不要使用 PythonOperator,不然 task 脚本会一直出现 import 导入路径报错。