설치
설치
sudo yum install -y [https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm](https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm)
sudo yum install -y postgresql13-server
sudo /usr/pgsql-13/bin/postgresql-13-setup initdb
sudo systemctl enable postgresql-13
sudo systemctl start postgresql-13
sudo systemctl status postgresql-13
- 계정확인
cat /etc/passwd | grep postgres
- 계정설정
su - postgres
psql
\list
CREATE DATABASE airflow;
\list
CREATE USER airflow WITH ENCRYPTED PASSWORD 'airflow';
\du
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
- 환경변수
vi ~/.bashrc
export AIRFLOW_HOME=/opt/airflow
source ~/.bashrc
- install airflow
pip3 install apache-airflow
- fix airflow cfg
cd $AIRFLOW_HOME
vi airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
base_url = http://localhost:8080
web_server_port = 8080
executor = LocalExecutor
load_examples = False
- excutor는 병렬처리 가능한 LocalExecutor 변경
- airflow 실행
airflow db init
airflow users create --username admin --password admin --role Admin --email [email protected] --firstname admin --lastname admin
airflow scheduler -D > /dev/null
airflow webserver -D
Airflow의 구성
User interface/웹 서버: 웹 대시보드 UIScheduler: DAG와 작업들을 모니터링하고 실행 순서와 상태를 관리한다.MetaStore: 메타 데이터를 관리한다. 순서나 작업 스케쥴링 등등Executor: 테스크가 어떻게 실행되는지 정의하는 컴포넌트Worker: 테스크를 실행하는 프로세스 ,Executor 종류에 따라 동작 방식이 다양함
⚪️ 오퍼레이터(Operator)
-> 하나의 테스크를 정의하는데 사용된다.
🤔 DAGs가 작동하면서 workflow를 어떻게 작동하는지 묘사하는 것 !
하나의 테스크들이 모여 하나의 DAG을 구성하고, DAG들이 모여 workflow가 이루어짐
operator는 DAG를 구성하기 위한 가장 작은 단위로 봐도 된다.
액션 오퍼레이터(Action Operators) : 실제 연산을 수행한다.
트랜스퍼 오퍼레이터(Transfer Operators) : 데이터를 옮긴다.
센서 오퍼레이터 (Sensor Operators) : 테스크를 언제 실행시킬 지 트리거를 기다린다.
⚪️ 테스크(Task)
- 오퍼레이터를 실행시키면 테스크가 된다.
- 테스크는 오퍼레이터의 인스턴스
⚪️ Workflow
작업 흐름이라는 뜻을 가지고 있고, 가장 작은 단위는 Operator들이고 이것들이 모여서 Task, Task들이 모여 DAG, DAG들이 모여 Workflow가 된다.
🟧 Workflow << DAGs << Tasks(Operators) 로 커짐 !
⚪️ DAG (Directed Acyclic Graph)
- 비선형 구조의 그래프라는 뜻, 한쪽으로 방향이 흐를 수 있다는 것을 뜻함.
- 관계 의존성을 그래프로 그려주어 사용자가 flow를 잘 파악할 수 있도록 도와주는 역할을 함 !
🟧 기본적인 DAG 작성법
**1. DAG 선언
2. task 정의
3. '>>','<<'등으로 stream 정의
**
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import datys_ago
default_args = {
'owner':'airflow'
'retries':1
'retry_delay':timedelta(minutes = 5)
}
with DAG(
'tutorial',
default_args = default_args,
descriptions = 'tutorial DAG',
schedule_interval = timedelta(days = 1),
start_date = days_ago(2),
tags = ['example']
) as dag:
t1 = BashOperator(
task_id= 'print_date',
bash_command = 'date',
)
t2 = BashOperator(
task_id = 'sleep',
depends_on_past = False,
bash_command = 'sleep 5',
retries = 3,
)
t1>>t2
✔️ Airflow의 유용성
데이터 웨어하우스, 머신 러닝, 실험, 데이터 인프라 관리에 쓰임!

- DAG를 작성하여 Workflow를 만든다. DAG는 Task로 구성되어 있음
- Task는 Operator가 인스턴스화 된 것.
- DAG를 실행 시킬 때 Scheduler는 DagRun 오브젝트를 만든다.
- DagRun 오브젝트는 Task Instance를 만든다.
- Worker가 Task를 수행 후 DagRun의 상태를 '완료'로 바꿔놓는다.