설치

설치

sudo yum install -y [https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm](https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm)  
sudo yum install -y postgresql13-server  
sudo /usr/pgsql-13/bin/postgresql-13-setup initdb  
sudo systemctl enable postgresql-13  
sudo systemctl start postgresql-13  
sudo systemctl status postgresql-13
cat /etc/passwd | grep postgres
su - postgres  
psql

\list  
CREATE DATABASE airflow;  
\list  
CREATE USER airflow WITH ENCRYPTED PASSWORD 'airflow';  
\du  
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
vi ~/.bashrc   
export AIRFLOW_HOME=/opt/airflow  
source ~/.bashrc
pip3 install apache-airflow
cd $AIRFLOW_HOME  
vi airflow.cfg

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow  
base_url = http://localhost:8080  
web_server_port = 8080  
executor = LocalExecutor  
load_examples = False
  1. excutor는 병렬처리 가능한 LocalExecutor 변경
airflow db init  
airflow users create --username admin --password admin --role Admin --email [email protected] --firstname admin --lastname admin  
airflow scheduler -D > /dev/null  
airflow webserver -D

Airflow의 구성

⚪️ 오퍼레이터(Operator)

-> 하나의 테스크를 정의하는데 사용된다.
🤔 DAGs가 작동하면서 workflow를 어떻게 작동하는지 묘사하는 것 !
하나의 테스크들이 모여 하나의 DAG을 구성하고, DAG들이 모여 workflow가 이루어짐
operator는 DAG를 구성하기 위한 가장 작은 단위로 봐도 된다.

액션 오퍼레이터(Action Operators) : 실제 연산을 수행한다.
트랜스퍼 오퍼레이터(Transfer Operators) : 데이터를 옮긴다.
센서 오퍼레이터 (Sensor Operators) : 테스크를 언제 실행시킬 지 트리거를 기다린다.

⚪️ 테스크(Task)

⚪️ Workflow

작업 흐름이라는 뜻을 가지고 있고, 가장 작은 단위는 Operator들이고 이것들이 모여서 Task, Task들이 모여 DAG, DAG들이 모여 Workflow가 된다.

🟧 Workflow << DAGs << Tasks(Operators) 로 커짐 !

⚪️ DAG (Directed Acyclic Graph)

🟧 기본적인 DAG 작성법

**1. DAG 선언
2. task 정의
3. '>>','<<'등으로 stream 정의
**

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import datys_ago

default_args = {
	'owner':'airflow'
    'retries':1
    'retry_delay':timedelta(minutes = 5)
    
}
with DAG(
	'tutorial',
    default_args = default_args,
    descriptions = 'tutorial DAG',
    schedule_interval = timedelta(days = 1),
    start_date = days_ago(2),
    tags = ['example']
    
) as dag:

t1 = BashOperator(
	task_id= 'print_date',
    bash_command = 'date',
)
t2 = BashOperator(
	task_id = 'sleep',
    depends_on_past = False,
    bash_command = 'sleep 5',
    retries = 3,

)

t1>>t2

✔️ Airflow의 유용성

데이터 웨어하우스, 머신 러닝, 실험, 데이터 인프라 관리에 쓰임!

airflow_structure.png

  1. DAG를 작성하여 Workflow를 만든다. DAG는 Task로 구성되어 있음
  2. Task는 Operator가 인스턴스화 된 것.
  3. DAG를 실행 시킬 때 Scheduler는 DagRun 오브젝트를 만든다.
  4. DagRun 오브젝트는 Task Instance를 만든다.
  5. Worker가 Task를 수행 후 DagRun의 상태를 '완료'로 바꿔놓는다.