AirFlow简介

AirFlow简介参考文章 AirFlow 简介 堕落门徒 博客园 airflow 实战总结 知乎 1 简介 Airflow 是一个可编程 调度和监控的工作流平台 基于有向无环图 DAG airflow 可以定义一组有依赖的任务 按照依赖依次执行 airflow 提供了丰富的命令行工具用于系统管控 而其 web 管理界面同样也可以方便的管控调度任务 并且对任务运行状态进行实时监控 方便了系统的运维和管理 1 1airflow 介绍 airflow 是一款开源的 分布式任务调度框架 它将一个具有上下

参考文章:AirFlow简介 – 堕落门徒 – 博客园

airflow 实战总结 – 知乎

1, 简介

​ Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

1.1 airflow 介绍 

  • airflow是一款开源的,分布式任务调度框架,它将一个具有上下级依赖关系的工作流,组装成一个有向无环图。
  • 特点:
  • 分布式任务调度:允许一个工作流的task在多台worker上同时执行
  • 可构建任务依赖:以有向无环图的方式构建任务依赖关系
  • task原子性:工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务

1.2 工作流示意图

AirFlow简介

 

  • 一个dag表示一个定时的工作流,包含一个或者多个具有依赖关系的task

1.3 task依赖图

AirFlow简介

1.4 架构图及集群角色

AirFlow简介

  • webserver : 提供web端服务,以及会定时生成子进程去扫描对应的目录下的dags,并更新数据库
  • scheduler : 任务调度服务,根据dags生成任务,并提交到消息中间件队列中 (redis或rabbitMq)
  • celery worker : 分布在不同的机器上,作为任务真正的的执行节点。通过监听消息中间件: redis或rabbitMq 领取任务
  • flower : 监控worker进程的存活性,启动或关闭worker进程,查看运行的task

2,执行器(Executor)

​ Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

  • SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
  • LocalExecutor:多进程本地执行任务
  • CeleryExecutor:分布式调度,生产常用
  • DaskExecutor :动态任务调度,主要用于数据分析

在当前项目使用CeleryExecutor作为执行器。

celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:

AirFlow简介

其中:

  • turing为外部系统
  • GDags服务帮助拼接成dag
  • master节点webui管理dags、日志等信息
  • scheduler负责调度,只支持单节点
  • worker负责执行具体dag中的task, worker支持多节点

在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

3,任务处理器

airflow内置了丰富的任务处理器,用于实现不同类型的任务:

  • BashOperator : 执行bash命令
  • PythonOperator : 调用python代码
  • EmailOperator : 发送邮件
  • HTTPOperator : 发送 HTTP 请求
  • SqlOperator : 执行 SQL 命令

除了这些基本的构建块之外,还有更多的特定处理器:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator 

在当前项目使用了HTTPOperator 作为执行器,用于调用JAVA服务,整体结构图如下:

AirFlow简介
AirFlow简介

关于airflow的环境搭建可以参考另外一篇博客: airflow + CeleryExecutor 环境搭建 – 堕落门徒 – 博客园

4,基本使用

4.1,常用命令

$ airflow webserver -D 守护进程运行webserver 
$ airflow scheduler -D 守护进程运行调度器 
$ airflow worker -D 守护进程运行调度器 
$ airflow worker -c 1 -D 守护进程运行celery worker并指定任务并发数为1 
$ airflow pause dag_id  暂停任务 
$ airflow unpause dag_id 取消暂停,等同于在管理界面打开off按钮 
$ airflow list_tasks dag_id 查看task列表 
$ airflow clear dag_id 清空任务实例 
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 运行整个dag文件 
$ airflow run dag_id task_id execution_date 运行task 

4.2,web管控界面的使用

启动web管控界面需要执行airflow webserver -D命令,默认访问端口是8080

http://110.55.63.51:8080/admin/

AirFlow简介
  1. 任务启动暂停开关
  2. 任务运行状态
  3. 待执行,未分发的任务
  4. 手动触发执行任务
  5. 任务管控界面

选择对应dag栏目,点击(5) Graph View即可进入任务管控界面

AirFlow简介

点击对应的任务,会弹出一个任务管控台,主要几个功能如下:

  • View Log : 查看任务日志
  • Run : 运行选中任务
  • Clear:清空任务队列
  • Mark Success : 标记任务为成功状态

4.3 通过定义DAG文件实现创建定时任务

1) 普通任务

from datetime import timedelta, datetime import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator default_args = { #默认参数 'owner': 'jifeng.si', #dag拥有者,用于权限管控 'depends_on_past': False, #是否依赖上游任务 'start_date': datetime(2018, 5, 2), #任务开始时间,默认utc时间 'email': ['@.com'], #告警通知邮箱地址 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'example_hello_world_dag', #dag的id default_args=default_args, description='my first DAG', #描述 schedule_interval='*/25 * * * *', # crontab start_date=datetime(2018, 5, 28) #开始时间,覆盖默认参数 ) def print_hello(): return 'Hello world!' dummy_operator = DummyOperator(task_id='dummy_task', dag=dag) hello_operator = BashOperator( #通过BashOperator定义执行bash命令的任务 task_id='sleep_task', depends_on_past=False, bash_command='echo `date` >> /home/py/test.txt', dag=dag ) dummy_operator >> hello_operator #设置任务依赖关系 #dummy_operator.set_downstream(hello_operator) 

2) 定义http任务并使用本地时间

import os from datetime import timedelta, datetime import pytz from airflow.operators.http_operator import SimpleHttpOperator from airflow.models import DAG default_args = { 'owner': 'cord', # 'depends_on_past': False, 'depends_on_past': True, 'wait_for_downstream': True, 'execution_timeout': timedelta(minutes=3), 'email': ['@.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } #将本地时间转换为utc时间,再设置为start_date tz = pytz.timezone('Asia/Shanghai') dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz) utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None) os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090' dag = DAG( 'bm01', default_args=default_args, description='my DAG', schedule_interval='*/2 * * * *', start_date=utc_dt ) #通过SimpleHttpOperator定义http任务 task1 = SimpleHttpOperator( task_id='get_op1', http_conn_id='http_test', method='GET', endpoint='test1', data={}, headers={}, dag=dag) task2 = SimpleHttpOperator( task_id='get_op2', http_conn_id='http_test', method='GET', endpoint='test2', data={}, headers={}, dag=dag) task1 >> task2 

4.4 crontab语法

crontab格式如下所示:

# ┌───────────── minute (0 - 59) # │ ┌───────────── hour (0 - 23) # │ │ ┌───────────── day of month (1 - 31) # │ │ │ ┌───────────── month (1 - 12) # │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday; # │ │ │ │ │ 7 is also Sunday on some systems) # │ │ │ │ │ # │ │ │ │ │ # * * * * * command to execute 
是否必须 取值范围 可用特殊符号 备注
Minutes Yes 0–59 * , -
Hours Yes 0–23 * , -
Day of month Yes 1–31 * , - ? L W ? L W部分实现可用
Month Yes 1–12 or JAN–DEC * , -
Day of week Yes 0–6 or SUN–SAT * , - ? L # ? L W 部分实现可用
Year No 1970–2099 * , - 标准实现里无这一项

特殊符号功能说明:

逗号(,)
​ 逗号用于分隔一个列表里的元素,比如 “MON,WED,FRI” 在第五域(day of week)表示Mondays, Wednesdays and Fridays。

连字符(-)
​ 连字符用于表示范围,比如2000–2010表示2000到2010之间的每年,包括这两年(闭区间)。

百分号(%)
​ 用于命令(command)中的格式化

L
​ 表示last,最后一个,比如第五域,5L表示当月最后一个星期五

W
W表示weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为15L ,这表示临近当月15附近的工作日,假如15号是星期六,那么定时器会在14号执行,如果15号是星期天,那么定时器会在16号执行,也就是说只会在离指定日期最近的那天执行。

井号#
#用于第五域(day of week),#后面跟着一个1~5之间的数字,这个用于表示第几个星期,比如5#3表示第三个星期五

?
​ 在有些实现里面,*的功能相同,还有一些实现里面?表示cron的启动时间,比如 当cron服务在8:25am启动,则? ? * * * *会更新为25 8 * * * *, 直到下一次cron服务重新启动,定时器会再次更新。

/
/一般与*组合使用,后面跟着一个数字,表示频率,比如在第一域(Minutes)中*/5表示每5分钟,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的缩写

https://en.wikipedia.org/wiki/Cron

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/228325.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午7:17
下一篇 2026年3月16日 下午7:18


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号