airflow 源码调试之源码运行(一)

  • Post author:
  • Post category:其他




airflow 源码调试之源码运行(一)


开发环境

  • windows上的ubuntu16.0虚拟机
  • Python3.8


调试准备

  • Pycharm 配置远程连接主机
  • 配置使用远程主机Python解释器

没有配置过的可以查看我这篇文章

Pycharm配置使用远程解释器调试远程代码


1.拉取源码

  1. 从GitHub上拉取airflow 的最新源码(2.02版本):https://github.com/apache/airflow.git

  2. 关于airflow版本 与 对应Python 版本,数据库版本等依赖的关系可以从GitHub上查看:

    在这里插入图片描述


2.安装源码第三方库依赖

  1. 从GitHub上可以查看依赖下载链接:

    在这里插入图片描述

  2. 但是个人在根据这个链接安装完依赖后,运行时发现还缺少httpx, celery, redis等库(需要自己再手动安装),补充安装库的版本号可以通过拉取源码的根目录下的setup.py文件中查询:

    在这里插入图片描述

  3. 我安装完后的所有库如下,可以直接使用安装的版本:

    alembic                         1.5.8
    amqp                            2.6.1
    apache-airflow                  2.0.2
    apache-airflow-providers-ftp    1.0.1
    apache-airflow-providers-http   1.1.1
    apache-airflow-providers-imap   1.0.1
    apache-airflow-providers-sqlite 1.0.2
    apispec                         3.3.2
    argcomplete                     1.12.3
    attrs                           20.3.0
    Babel                           2.9.0
    billiard                        3.6.4.0
    blinker                         1.4
    cached-property                 1.5.2
    cattrs                          1.5.0
    celery                          4.4.7
    certifi                         2020.12.5
    cffi                            1.14.5
    chardet                         3.0.4
    click                           7.1.2
    clickclick                      20.10.2
    colorama                        0.4.4
    colorlog                        5.0.1
    commonmark                      0.9.1
    connexion                       2.7.0
    croniter                        0.3.37
    cryptography                    3.4.7
    defusedxml                      0.7.1
    dill                            0.3.2
    dnspython                       1.16.0
    docutils                        0.17.1
    email-validator                 1.1.2
    Flask                           1.1.2
    Flask-AppBuilder                3.2.3
    Flask-Babel                     1.0.0
    Flask-Caching                   1.10.1
    Flask-JWT-Extended              3.25.1
    Flask-Login                     0.4.1
    Flask-OpenID                    1.2.5
    Flask-SQLAlchemy                2.5.1
    Flask-WTF                       0.14.3
    flower                          0.7.3
    graphviz                        0.16
    gunicorn                        19.10.0
    h11                             0.12.0
    httpcore                        0.13.3
    httpx                           0.18.1
    idna                            2.10
    importlib-metadata              1.7.0
    importlib-resources             1.5.0
    inflection                      0.5.1
    iso8601                         0.1.14
    isodate                         0.6.0
    itsdangerous                    1.1.0
    Jinja2                          2.11.3
    jsonschema                      3.2.0
    kombu                           4.6.11
    lazy-object-proxy               1.4.3
    lockfile                        0.12.2
    Mako                            1.1.4
    Markdown                        3.3.4
    MarkupSafe                      1.1.1
    marshmallow                     3.11.1
    marshmallow-enum                1.5.1
    marshmallow-oneofschema         2.1.0
    marshmallow-sqlalchemy          0.23.1
    mysqlclient                     2.0.3
    natsort                         7.1.1
    numpy                           1.20.2
    openapi-schema-validator        0.1.5
    openapi-spec-validator          0.3.0
    pandas                          1.2.4
    pendulum                        2.1.2
    pip                             21.1.1
    prison                          0.1.3
    psutil                          5.8.0
    pycparser                       2.20
    Pygments                        2.8.1
    PyJWT                           1.7.1
    PyMySQL                         1.0.2
    pyrsistent                      0.17.3
    python-daemon                   2.3.0
    python-dateutil                 2.8.1
    python-editor                   1.0.4
    python-nvd3                     0.15.0
    python-slugify                  4.0.1
    python3-openid                  3.2.0
    pytz                            2021.1
    pytzdata                        2020.1
    PyYAML                          5.4.1
    redis                           3.5.3
    requests                        2.25.1
    rfc3986                         1.5.0
    rich                            9.2.0
    setproctitle                    1.2.2
    setuptools                      56.1.0
    six                             1.15.0
    sniffio                         1.2.0
    SQLAlchemy                      1.3.24
    SQLAlchemy-JSONField            1.0.0
    SQLAlchemy-Utils                0.37.0
    swagger-ui-bundle               0.0.8
    tabulate                        0.8.9
    tenacity                        6.2.0
    termcolor                       1.1.0
    text-unidecode                  1.3
    tornado                         5.1.1
    typing-extensions               3.7.4.3
    unicodecsv                      0.14.1
    urllib3                         1.25.11
    vine                            1.3.0
    Werkzeug                        1.0.1
    wheel                           0.36.2
    WTForms                         2.3.3
    zipp                            3.4.1
    


3.安装外部依赖

  • 外部依赖根据实际需求安装,我使用的是MySQL数据库存放dag元数据与Celery executor调度执行,所以需要安装redis和MySQL


4.配置airflow 配置文件

  • airflow 源码运行入口是在:airflow->airflow->

    __main__.py

    ,该文件会导入配置文件,进行配置文件内容读取初始化

  • 如果我们不是使用源码方式运行,而是以第三方库的方式运行,在安装airflow库的时候,需要先设置airflow的环境变量叫做:

    AIRFLOW_HOME

    。如果不设置默认在当前用户的根目录下

    AIRFLOW_HOME=~/airflow

    • airflow.cfg(自动生成):存放airflow相关配置
    • logs文件夹(自动生成):airflow运行日志文件
    • dag 文件夹(这个是我自己建的文件夹):这个用于存放dag文件
    • webserver_config.py(自动生成):webserver相关配置文件
  • 编辑 airflow.cfg:

    • 修改[core]中的配置:

      – dag存放目录(我使用的是上边自己建立的目录):

      dags_folder = /root/airflow/dags


      – executor类别:

      executor = CeleryExecutor


      – sql 连接URI:

      sql_alchemy_conn = mysql+pymysql://airflow_user:airflow_pass@192.168.1.219:3306/airflow_db
  • 修改[celery]中的配置:

    • result_backend:

      result_backend = redis://192.162.1.219/15


5.初始化数据库

  • 配置启动命令参数:

    在这里插入图片描述

  • 正常初始化:

    在这里插入图片描述


6.运行webserver

  1. 通过源码的方式启动webserver 需要先编译前端所需文件,需要确保主机上安装了npm 和 yarn:

    在这里插入图片描述




    :我在自己的ubuntu 上执行这个脚本会报语法错误,就直接在终端执行shell命令, 这里有个需要注意的地方,我将命令中的

    yarn

    替换为了

    /usr/share/yarn/bin/yarn

    这是因为我在主机上同时安装了Hadoop的yarn和Node的yarn 导致两个环境变量冲突了,此处需要写Node的yarn的全路径。

    MD5SUM_FILE="static/dist/sum.md5"
    readonly MD5SUM_FILE
    
    /usr/share/yarn/bin/yarn install --frozen-lockfile
    /usr/share/yarn/bin/yarn run build
    
    find package.json yarn.lock static/css static/js -type f | sort | xargs md5sum > "${MD5SUM_FILE}"
    

    还有一个需要注意的地方是在执行

    /usr/share/yarn/bin/yarn run build

    时报了下边这个错:

    在这里插入图片描述

    此处需要将install 生成的

    package.json

    文件修改如下:

    在这里插入图片描述

  2. 然后启动webserver:

    在这里插入图片描述

在这里插入图片描述


7.启动scheduler


在这里插入图片描述

在这里插入图片描述


8.启动celery worker

  • 确保安装celery , redis 第三方库

    在这里插入图片描述


学习开源项目最好的方式就是本地运行起来源码,然后去调试它,调试环境搭建好后,接下来会从DAG创建开始,到scheduler调度,逐步调试学习airflow运行原理。



版权声明:本文为qq_42586468原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。