Airflow 1.10.2 on Centos 6.5

更新源
yum install https://centos6.iuscommunity.org/ius-release.rpm -y
wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-6.repo
yum makecache
安装 Python3.6
yum install python36u python36u-devel -y
ln -s /usr/bin/python3.6 /bin/python3
安装 pip
yum install python36u-pip -y
ln -s /usr/bin/pip3.6 /bin/pip3
pip3 install --upgrade pip
安装 virtualenv
pip3 install virtualenv
安装 Airflow
  • 创建目录

    mkdir -p /opt/airflow
    mkdir -p /opt/airflow/airflow
    
    cd /opt/airflow/
    
  • 激活环境

    virtualenv -p `which python3.6` venv
    source venv/bin/activate
    
    退出虚拟环境
    deactivate
    
  • 安装依赖包

     pip3 install cryptography
     pip3 install flask-bcrypt
     pip3 install mysql-connector
    
  • 安装

    pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
    
    export AIRFLOW_HOME=/opt/airflow/airflow
    export SLUGIFY_USES_TEXT_UNIDECODE=yes
    pip3 install apache-airflow==1.10.2
    
    AIRFLOW_HOME=/opt/airflow/airflow SLUGIFY_USES_TEXT_UNIDECODE=yes pip3 install apache-airflow==1.10.2
    
配置
  • 修改时区

    vim $AIRFLOW_HOME/airflow.cfg
    
    default_timezone = Asia/Shanghai
    expose_config = True
    
    python -c "from airflow.utils import timezone;print(timezone.datetime(2019,7,1))"
    
  • 配置MySQL数据库

    • 安装包

      sudo apt-get install libmysqlclient-dev
      pip3 install apache-airflow[mysql]
      
    • 创建数据库

      CREATE DATABASE airflow;
      ALTER DATABASE `airflow` CHARACTER SET utf8; 
      CREATE USER 'airflow'@'%' IDENTIFIED BY 'af123854';
      GRANT all privileges on airflow.* TO 'airflow'@'%' IDENTIFIED BY 'af123854';
      FLUSH PRIVILEGES;
      
    • 修改cfg

      vim $AIRFLOW_HOME/airflow.cfg
      
      sql_alchemy_conn = mysql://airflow:[email protected]/airflow
      
  • 配置 celery + rabbitmq

    • 安装包

      pip3 install apache-airflow[celery] 
      pip3 install apache-airflow[rabbitmq]
      
    • 创建用户相关信息

      sudo rabbitmqctl add_user airflow af123854
      sudo rabbitmqctl add_vhost airflow-rabbitmq
      sudo rabbitmqctl set_user_tags airflow airflow-rabbitmq
      sudo rabbitmqctl set_permissions -p monitoring airflow ".*" ".*" ".*"
      
    • 修改cfg

      vim $AIRFLOW_HOME/airflow.cfg
      
      executor = CeleryExecutor
      
      broker_url = amqp://airflow:[email protected]:5672/airflow-rabbitmq
      airflow 1.9.0以上使用的是celery4.x, 而celery 4.x使用json序列化,不是用pickle进行序列化,修改为:
      broker_url = pyamqp://airflow:[email protected]:5672/airflow-rabbitmq
      
      result_backend 推荐使用数据库,在MySQL那边创建
      CREATE DATABASE result_backend;
      ALTER DATABASE `result_backend` CHARACTER SET utf8;
      GRANT all privileges on result_backend.* TO 'airflow'@'%' IDENTIFIED BY 'af123854';
      FLUSH PRIVILEGES;
      
      vim $AIRFLOW_HOME/airflow.cfg
      result_backend = db+mysql://airflow:[email protected]/result_backend
      
  • commands

    • init

      airflow initdb
      
    • webserver

      启动:
      nohup airflow webserver -p 8080 > /opt/airflow/airflow/logs/webserver.log 2>&1 &
      
      停止:
      ps aux | grep webserver | grep airflow | awk '{print $2}'| xargs -n 1 kill -9
      
      访问
      http://127.0.0.1:8080
      
    • scheduler

      启动:
      nohup airflow scheduler > ./scheduler.out 2>&1 &
      
      停止:
      ps aux | grep scheduler | grep -v failover | grep airflow | awk '{print $2}'| xargs -n 1 kill -9
      
    • flower

      启动
      nohup airflow flower  > /opt/airflow/airflow/logs/flower.out 2>&1 &
      
      停止
      ps aux | grep flower | grep airflow | awk '{print $2}'| xargs -n 1 kill -9
      
      访问
      visit http://127.0.0.1:5555
      
    • worker

      启动
      export C_FORCE_ROOT=True
      nohup airflow worker > ./worker.out 2>&1 &
      
      指定队列启动
      nohup airflow worker -q testQueue > ./worker.out 2>&1 &
      
      停止
      ps aux | grep worker | grep celeryd  | awk '{print $2}'| xargs -n 1 kill -9
      
    • 其它

      airflow list_dags
      airflow delete_dag tutorial
      airflow list_tasks tutorial
      airflow list_tasks tutorial --tree 
      
      airflow test dag_name task_name test_time
      airflow run dagid [time] run task instance
      airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
      
0%