Apache Airflow — Linux安装

  • Post author:
  • Post category:linux



在这里插入图片描述

在这里插入图片描述


Airflow官网地址:

https://airflow.apache.org/docs/apache-airflow/stable/start/index.html

.

在这里插入图片描述

在这里插入图片描述



官方提供的图片:


在这里插入图片描述

Airflow Python

2.3.0及以上

3.8.10

2.0.2 ~ 2.2.5

3.6.12


2022-06-21安装2.3.0以上版本一直报警,无法解决,降低版本


在这里插入图片描述





1. Python Install

  • **下载python 3.7.8 ,下面的都改成3.6.12,某些地方略微修改 **


    wget https://www.python.org/ftp/python/3.6.12/Python-3.6.12.tgz

  • 解压



    tar -zxvf Python-3.6.12.tgz


    在这里插入图片描述

  • 安装依赖



    sudo yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel libffi-devel

  • 安装



    cd Python-3.6.12



    ./configure -prefix=/usr/local/python3


    在这里插入图片描述


    看到很多config后面加参数的,那都是高版本才有的,低版本都没有参数



    Python config:

    https://docs.python.org/zh-cn/dev/using/configure.html

    .


    在这里插入图片描述

  • 编译



    make && make install

  • 添加软链



    最好先删除软链,请看下面。然后再创建



    sudo ln -s /usr/local/python3/bin/python3.6 /usr/bin/python3



    sudo ln -s /usr/local/python3/bin/pip3.6 /usr/bin/pip3

  • 检验



    python3 -V



    pip3 -V

  • 升级



    pip3 install –upgrade pip

  • 退出python命令



    输入exit(),回车



    输入quit(),回车




2. Mysql Install


CentOS安装mysql真的太难了,花了我一天的时间



Mysql Install:

https://blog.csdn.net/weixin_43916074/article/details/125284109

.




3. Airflow Install




3.1 Run Locally


Running Airflow locally:

https://airflow.apache.org/docs/apache-airflow/2.2.3/start/local.html

.


在这里插入图片描述


  • Airflow needs a home.

    ~/airflow

    is the default, but you can put it



    export AIRFLOW_HOME=~/airflow


  • Install Airflow using the constraints file



    AIRFLOW_VERSION=2.3.2



    PYTHON_VERSION=“$(python –version | cut -d ” ” -f 2 | cut -d “.” -f 1-2)”


  • For example: 3.7



    CONSTRAINT_URL=“https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt”


  • For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.7.txt



    pip install “apache-airflow==${AIRFLOW_VERSION}” –constraint “${CONSTRAINT_URL}”


  • The Standalone command will initialise the database, make a user,and start all components for you.



    airflow standalone




3.2 Pip3 install


  • Airflow needs a home.

    ~/airflow

    is the default, but you can put it



    export AIRFLOW_HOME=~/airflow

  • pip3 install “apache-airflow[celery]==2.2.5” –constraint “https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.6.txt”


  • 当我的环境python = 3.6.12



    sudo pip3 install apache-airflow


    =>自动选择2.2.5版本

在这里插入图片描述


  • 报警


    在这里插入图片描述


    sudo pip3 install --upgrade pip



    sudo pip3 install apache-airflow

  • 进入python安装目录



    cd /usr/local/python3/bin

  • 初始化



    ./airflow

这下面使用airflow都要./airlfow,要建软链,我配置变数都不行。


  • 配置环境变量



    sudo vim /etc/profile



    content :


    export AIRFLOW_HOME=~/airflow

  • 执行配置



    source /etc/profile

  • 检查版本



    airflow version

  • 会报警,说升级更新sqlit


  • 配置环境变量



    sudo vim ~/.bashrc



    content :


    export AIRFLOW_HOME=~/airflow

  • 执行配置



    source ~/.bashrc

  • 检查版本



    airflow version




3.2 Config Mysql


  • 进入mysql



    mysql -u root -p

  • 建数据库



    mysql> create database if not exists airflow default character set utf8 default collate utf8_general_ci;

  • 建用户



    mysql> create user ‘airflow’@‘%’ identified by ‘Airflow@123’;

  • 授权1



    mysql> grant all privileges on airflow.* to airflow@localhost identified by ‘Airflow@123’;


  • 授权2



    mysql> grant all privileges on airflow.* to ‘airflow’@‘%’ identified by ‘Airflow@123’;


  • 授权3



    mysql> flush privileges;

  • 修改配置~/airflow/airflow.cfg,一般都在当前用户下面



    #sql_alchemy_conn = sqlite:data/airflow/airflow.db



    sql_alchemy_conn = mysql://airflow:Airflow@123@localhost:3306/airflow

  • 执行



    airflow version

    在这里插入图片描述

  • 继续安装插件



    sudo yum install -y mysql-devel



    pip3 install pymysql



    pip3 install mysql

  • 初始化



    airflow db init


    在这里插入图片描述

  • Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql



    mysql -u root -p



    mysql > show databases;



    mysql > use airflow;



    mysql > show global variables like ‘%timestamp%’;



    mysql > set global explicit_defaults_for_timestamp =1;



    mysql > exit

    在这里插入图片描述

  • 再次初始化



    airflow db init


    在这里插入图片描述

    在这里插入图片描述

  • 启动服务



    airflow users create -u admin -p admin -f admin -l admin -r Admin -e xxxxx



    airflow webserver –port 8080



    airflow scheduler

  • 访问



    http://ip:8080/




4. HA Environment




4.1 Celery+Redis


  • 安装redis



    yum -y install redis

  • 修改redis配置



    sudo vim /etc/redis.conf



    #bind 127.0.0.1

    // 注释掉,使redis允许远程访问


    requirepass redis

    // 修改这行,redis设置登录密码(自定义,目前是redis)

  • 关闭密码验证



    protected-mode no

  • 启动redis



    sudo systemctl start redis

  • 检查redis服务



    ps -ef | grep redis

  • 测试redis服务



    cd /usr/bin



    redis-cli

    类似于进入mysql


    redis install with ssl:

    https://blog.csdn.net/weixin_43916074/article/details/126470126

    .

  • python下载redis库



    sudo pip3 install redis

  • python下载celery库



    sudo pip3 install celery

  • 修改配置 airflow.cfg



    executor = CeleryExecutor



    broker_url = redis://127.0.0.1:6379/0



    result_backend = redis://127.0.0.1:6379/0


    or


    result_backend = db+musql://username:password@localhsot:3306/airflow

  • 搜索,总能找到一些蛛丝马迹



    sudo find / -name ‘redis*’


    在这里插入图片描述




4.2 Workers


修改master的配置,不要用localhost

[core]
dags_folder = /root/airflow/dags
 
#修改时区
default_timezone = Asia/Shanghai
 
#配置Executor类型,集群建议配置CeleryExecutor
executor = CeleryExecutor
 
# 配置数据库
sql_alchemy_conn=mysql://airflow:Airflow@123@xxipxx:3306/airflow
[webserver]
#设置时区
default_ui_timezone = Asia/Shanghai
 
[celery]
#配置Celery broker使用的消息队列
broker_url = redis://xxipxx:6379/0
#配置Celery broker任务完成后状态更新使用库
result_backend = db+mysql://airflow:Airflow@123@xxipxx:3306/airflow


将master的airflow.cfg覆盖掉其他节点的airflow.cfg

其实就是所有人都共用一个mysql和redis


在master启动webserver,scheduler,在其他node启动wroker


  • master command



    cd /usr/local/python3/bin



    ./airflow webserver --port 8081



    ./airflow scheduler


  • other node command



    ./airflow celery worker


  • 前面都是测试,正式环境都要守护进程启动



    ./airflow webserver -D



    ./airflow scheduler -D



    ./airflow celery worker -D




5. Https




5.1 Close service


  • close service



    ps -ef | grep airflow



    kill -9 xxxx




5.2 Config https

在这里插入图片描述




6. Delete Dags



  • 客制化dags,只需要参考tutorial code就ok啦,如何将其他删掉

  • 通过detail找到路径,全部移除


    在这里插入图片描述

  • 最后效果


    在这里插入图片描述

  • 客制化的直接在airflow的目录删除就好了



    /home/xxxx/airflow/dags




7. Email Notification




7.1 Create DAG

with DAG(
    dag_id='hello_world',
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'owner': 'Nan',
        'depends_on_past': False,
        'email': ['nan.zhao@deltaww.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        'on_failure_callback': failure_callback,   //打开这两个
        'on_success_callback': success_callback,   //打开这两个回调函数
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    # [END default_args]
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    tags=['example'],
) as dag:

    # [START documentation]
  
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    download_task= PythonOperator(
        task_id = "hello_world",
        python_callable = demo    //呼叫自定义demo function
    )
 
# [END tutorial]




7.2 Send_email

// 获取airflow.cfg里面的配置参数
def send_email(mail_msg):
    """SMTP Service"""
    mail_host = conf.get("smtp","smtp_host")
    mail_user = conf.get("smtp","smtp_user")
    mail_pass = conf.get("smtp","smtp_password")
    sender = conf.get("smtp","smtp_mail_from")
    port = conf.get("smtp","smtp_port")

    receivers = ['xxxxx.com']

    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = sender
    message['To'] = ";".join(receivers)
    subject = 'Airflow Msg'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtp_obj = smtplib.SMTP(mail_host)
        smtp_obj.connect(mail_host,port)
        smtp_obj.starttls()
        smtp_obj.login(mail_user, mail_pass)
        smtp_obj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException as error:
        print("Error: send failure ,"+ error)

def send_email_fun(msg):
    send_email(msg)




7.3 Call_back

// An highlighted block
def send_email_fun(msg):
    send_email(msg)

 
def success_callback(context: dict):
    dag_id = context['dag'].dag_id
    email = context['dag'].default_args['email']
    schedule_interval = context['dag'].schedule_interval
    task_id = context['task_instance'].task_id
    run_id = context['run_id']
    operator = context['task_instance'].operator
    state = context['task_instance'].state
    duration = '%.1f' % context['task_instance'].duration
    max_tries = context['task_instance'].max_tries
    hostname = context['task_instance'].hostname
    start_date = context['task_instance'].start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_date = context['task_instance'].end_date.strftime('%Y-%m-%d %H:%M:%S')
    params = context['params']
    var = context['var']
    test_mode = context['test_mode']
    execution_date = context['logical_date'].strftime('%Y-%m-%d %H:%M:%S')
    next_execution_date = context['data_interval_end'].strftime('%Y-%m-%d %H:%M:%S')
 
    msg = f"""<h3 style='color: green;'>Airflow DAG: {task_id}</h3><tr>
        <table width='1500px' border='1' cellpadding='2' style='border-collapse: collapse'>
        <tr><td width='30%' align='center'>DAG Name</td><td>{dag_id}</td></tr>
        <tr><td width='30%' align='center'>Task name</td><td>{task_id}</td></tr>
        <tr><td width='30%' align='center'>Run Cycle</td><td>{schedule_interval}</td></tr>
        <tr><td width='30%' align='center'>Run ID</td><td>{run_id}</td></tr>
        <tr><td width='30%' align='center'>Task type</td><td>{operator}</td></tr>
        <tr><td width='30%' align='center'>Stask Status</td><td style='color: green;'>Succeed</td></tr></table>
    """
    
    print('succeed')
    send_email_fun(msg)
    print("this is succeed")

def failure_callback(context: dict):
    dag_id = context['dag'].dag_id
    email = context['dag'].default_args['email']
    schedule_interval = context['dag'].schedule_interval
    task_id = context['task_instance'].task_id
    run_id = context['run_id']
    operator = context['task_instance'].operator
    state = context['task_instance'].state
    duration = '%.1f' % context['task_instance'].duration
    max_tries = context['task_instance'].max_tries
    hostname = context['task_instance'].hostname
    start_date = context['task_instance'].start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_date = context['task_instance'].end_date.strftime('%Y-%m-%d %H:%M:%S')
    params = context['params']
    var = context['var']
    test_mode = context['test_mode']
    exception = context['exception']
    execution_date = context['logical_date'].strftime('%Y-%m-%d %H:%M:%S')
    next_execution_date = context['data_interval_end'].strftime('%Y-%m-%d %H:%M:%S')
 
    msg = f"""<h3 style='color: green;'>Airflow DAG : {task_id} Error</h3>
        <table width='100%' border='1' cellpadding='2' style='border-collapse: collapse'>
        <tr><td width='30%' align='center'>DAG Name</td><td>{dag_id}</td></tr>
        <tr><td width='30%' align='center'>Task Name</td><td>{task_id}</td></tr>
        <tr><td width='30%' align='center'>Run Cycle</td><td>{schedule_interval}</td></tr>
        <tr><td width='30%' align='center'>Task ID</td><td>{run_id}</td></tr>
        <tr><td width='30%' align='center'>Task Type</td><td>{operator}</td></tr>
        <tr><td width='150px' style='color: red;'>Task Status</td><td style='color: red;'>{state}</td></tr>
        <tr><td width='30%' align='center'>Retry</td><td>{max_tries}</td></tr>
        <tr><td width='30%' align='center'>Time</td><td>{duration}s</td></tr>
        <tr><td width='30%' align='center'>Hostname</td><td>{hostname}</td></tr>
        <tr><td width='30%' align='center'>exe_date</td><td>{execution_date}</td></tr>
        <tr><td width='30%' align='center'>stasrt_date</td><td>{start_date}</td></tr>
        <tr><td width='30%' align='center'>end_date</td><td>{end_date}</td></tr>
        <tr><td width='30%' align='center'>previoue_exe_date</td><td>{prev_execution_date}</td></tr>
        <tr><td width='30%' align='center'>next_exe_date</td><td>{next_execution_date}</td></tr>
        <tr><td width='30%' align='center'>Parmameters</td><td>{params}</td></tr>
        <tr><td width='30%' align='center'>Variable</td><td>{var}</td></tr>
        <tr><td width='30%' align='center'>Mode</td><td>{test_mode}</td></tr>
        <tr><td width='30%' align='cneter'>Task Status</td><td style='color: red;'>{state}</td></tr>
        <tr><td width='150px' style='color: red;'>Error Msg</td><td style='color: red;'>{exception}</td></tr></table>
    """
 
    send_email_fun(msg)
    print("this is failure")




8. Config


airflow.cfg

[core]
dags_folder = /airlfow_home/dags        #客制化的dag目录,用python执行后,自动写入UI界面

default_timezone = Asia/Shanghai        #设置默认的时区

executor    #有以下几个选择,生产一般使用CeleryExecutor
   SequentialExecutor    #按顺序调度,这个一般适用于开发、测试,因为按顺序调度,真的是太慢了。
   LocalExecutor         #local模式,就无法使用集群扩展了。
   CeleryExecutor        #分布式调度
   DaskExecutor          #没用过
   
KubernetesExecutor       #kubernetes方式提交任务,也适用于生产,一般是那些单个任务比较大的情况。

#airflow的元数据库连接串,可以是sqlite、mysql、postgresql,推荐mysql和postgresql:
sql_alchemy_conn = mysql://username:password@localhost:3306/airflow?charset=utf8

parallelism            #所有worker同时运行的task数;
dag_concurrency        #单个dag中同时运行的task数;

max_active_runs_per_dag     #单个dag最多同时运行的dag_run数;

load_examples          #是否加载airflow自带的example dag;

fernet_key             #connection和variable加密用的key,记住,如果你要升级airflow,升级后需要把此参数值copy到新的配置文件中,不然升级后,airflow中的connection和variable加密的值会解密失败,无法查看;

dagbag_import_timeout            #scheduler加载新的dag使用的超时时间;
dag_file_processor_timeout       #scheduler解析dag使用的超时时间;

store_serialized_dags            #是否序列化dag,开启时,scheduler会定时将dag序列化到数据库中存储起来,这样我们的webserver就直接可以查数据库了,更方便快捷高效;
min_serialized_dag_update_interval  #序列化dag的最小间隔时间,也就是说,scheduler序列化所有dag后,隔多长时间再序列化。
store_dag_code                   #是否存储dag代码到数据库;


[webserver]
base_url                         #webserver启动的访问url
web_server_host                  #webserver的host
web_server_port                  #webserver的访问端口

web_server_ssl_cert              #开启https,使用openssl生成的cert 
web_server_ssl_key               #开启https,使用openssl生成的key

web_server_master_timeout        #webserver其实启动的是gunicorn服务,那么webserver master gunicorn服务超过这个时间没有响应,就会抛出异常;
web_server_worker_timeout        #webserver worker服务同理
workers                          #webserver启动的gunicorn服务个数,个数越多,webserver更快,当然根据系统资源来设置

expose_config                    #是否可以在web上查看airflow.cfg;

authenticate              #webserver认证方式,我们可以设置以下几个登录认证方式,这几个方式仅限登录认证:
    - github_enterprise_auth
    - google_auth
    - kerberos_auth
    - ldap_auth
    - password_auth
#当然我们后面还有一个rbac认证模式,这是生产用的最多的方式,因为它包含了web界面上每一个点击按钮的权限
filter_by_owner                   #当我们使用了以上认证方式时,如果filter_by_owner设置为true,那么登录后,界面上显示的dag设置的owner就是跟登录用户一致的。做到了dag的用户隔离;

dag_default_view                  #在我们从web界面进入某个dag时,默认展示的页面,可以设置tree, graph, duration, gantt, landing_times

hide_paused_dags_by_default       #是否隐藏关闭的dag;

page_size                         #web页面主页默认展示的dag个数,设置越少,打开主页速度越快哦;

rbac                              #权限认证模式,开启时,可以做到web界面的权限控制,细粒度到每个dag的每个task的每个操作;

default_dag_run_display_number    #打开dag的tree界面时,默认显示多少个运行批次:越少越好,打开的速度也就越快;



[sentry]
sentry_dsn     #sentry是一个监控系统,可以监控到airflow服务的运行情况

[celery]
worker_concurrency      #单个worker的并发执行task数

broker_url              #broker支持rabbitmq/redis/mysql/postgresql, task执行命令的中转站,scheduler发送执行命令到中转站,然后worker去消费这些task命令,并执行

result_backend          #worker执行完task,将结果存储的位置,官方推荐使用落地的持久化数据库,如mysql、postgresql等

flower_host             #flower是airflow中监控broker和worker的一个服务,有自己的单独页面,可以看到broker情况、task执行情况

flower_port             #flower访问的端口


[scheduler]                   #airflow最重要的角色:调度器,它承担着初始化dag文件、动态编译dag文件、发现调度任务并发送执行命令的责任。我们调度的快慢,也需要从这里设置一些参数,下面来看看。
- job_heartbeat_sec           #任务的心跳监控时间,每隔job_heartbeat_sec秒去监控一下任务的状态
- scheduler_heartbeat_sec     #监控scheduler的状态,每隔scheduler_heartbeat_sec秒去监控scheduler状态
- run_duration                #scheduler运行多长时间停止,-1表示不停止,持续调度
- num_runs                    #每隔dag的调度次数,-1表示无限次
- processor_poll_interval     #解析每个dag之间的间隔时间
- min_file_process_interval   #间隔多长时间一个新的dag被拾起;
- dag_dir_list_interval       #间隔多长时间,scheduler从磁盘读取出dag列表。跟上一个参数min_file_process_interval的区别就是,此参数不解析dag,只获取dag列表。先获取dag列表,在解析dag文件;
- parsing_processes           #编译dag所使用的schedule进程数,这个非常关键,影响到dag的调度速度和task的调度速度,官方推荐cpu盒数-1,但是我们可以设置到2倍的cpu盒数,榨干cpu。




9. Error



  • 启动worker报错:connection in use: (‘0.0.0.0’, 8793)


    在这里插入图片描述

  • 查询8793port是被谁占用,对应的pid



    netstat -npl



    sudo kill -9 127070

    在这里插入图片描述

  • 再次启动


    在这里插入图片描述


    ps -ef 查看服务


    在这里插入图片描述


  • airflow webserver -D 无法使用守护进程启动



    进入airflow安装目录下,remove airflow-webserver.err and airflow-webserver-monitor.pid


    在这里插入图片描述


  • 启动报错:Add Permission to Role Error



    将airflow的版本切换到2.2.3 or 2.2.5


    在这里插入图片描述



版权声明:本文为weixin_43916074原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。