1. Apache Airflow 소개
1.1 Batch Process란?
예약된 시간에 실행되는 프로세스 일회성(1회)도 가능하고, 주기적인 실행도 가능
ex. 이번 주 일요일 07:00에 1번 실행되는 프로세스 ex. 매주 일요일 07:00에 실행되는 프로세스
Batch Process를 AI 엔지니어가 알아야 하는 이유
- 모델을 주기적으로 학습시키는 경우 사용(Continuous Training)
- 주기적인 Batch Serving을 하는 경우 사용
- 그 외 개발에서 필요한 배치성 작업
Batch Process - Airflow 등장 전
대표적인 Batch Process 구축 방법 → Linux Crontab
Linux Crontab의 문제
- 재실행 및 알람 : 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음. 실패할 경우, 자동으로 몇 번 더 재실행(Retry)
- 과거 실행 이력 및 실행 로그를 보기 어려움: 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듬 Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음
=> 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함
스케줄링 워크플로우 전용 도구
1.2 Airflow 소개
- 현재 스케줄링, 워크플로우 도구의 표준
- 에어비앤비(Airbnb)에서 개발
- 현재 릴리즈된 버전은 2.2.0으로, 업데이트 주기가 빠름
- 스케줄링 도구로 무거울 수 있지만, 거의 모든 기능을 제공하고, 확장성이 넓어 일반적으로 스케줄링과 파이프라인 작성 도구로 많이 사용
- 특히 데이터 엔지니어링 팀에서 많이 사용
Airflow 기능
- 파이썬을 사용해 스케줄링 및 파이프라인 작성
- 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공
- 실패 시 알람
- 실패 시 재실행 시도
- 동시 실행 워커 수
- 설정 및 변수 값 분리
2. Apache Airflow 실습
2.1 DAG과 Task
- Airflow에서는 스케줄링할 작업을 DAG이라고 부름
- DAG은 Directed Acyclic Graph(순환하지 않는 방향이 존재하는 그래프)의 약자로, Airflow에 한정된 개념이 아닌 소프트웨어 자료구조에서 일반적으로 다루는 개념
- 각 DAG은 Task로 구성
- DAG 내 Task는 순차적으로 실행되거나, 동시에(병렬로) 실행할 수 있음
- DAG 1개 : 1개의 파이프라인
- Task : DAG 내에서 실행할 작업
- ex) DAG의 3가지 Task 구성 -> extract, transform, load
2.2 유용한 Operator
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html
airflow.operators — Airflow Documentation
airflow.apache.org
airflow.operators.python.PythonOperator
- 파이썬 함수를 실행
- 함수 뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행할 수 있음
- 실행할 파이썬 로직을 함수로 생성한 후, PythonOperator로 실행
airflow.operators.bash.BashOperator
- Bash 커맨드를 실행
- 실행해야 할 프로세스가 파이썬이 아닌 경우에도 BashOperator로 실행 가능
- ex. shell 스크립트, scala 파일 등
airflow.operators.empty.EmptyOperator
- 아무것도 실행하지 않음
- DAG 내에서 Task를 구성할 때, 여러 개의 Task의 SUCCESS를 기다려야 하는 복잡한 Task 구성에서 사용
airflow.providers.http.operators.http.SimpleHttpOperator
- 특정 호스트로 HTTP 요청을 보내고 Response를 반환
- 파이썬 함수에서 requests 모듈을 사용한 뒤 PythonOperator로 실행시켜도 무방
- 다만 이런 기능이 Airflow Operator에 이미 존재하는 것을 알면 좋음
클라우드의 기능을 추상화한 Operator도 존재(AWS, GCP 등)
3. Apache Airflow 아키텍처
DAG Directory
- DAG 파일들을 저장
- 기본 경로는 $AIRFLOW_HOME/dags
- DAG_FOLDER 라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
- Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱
Scheduler
- Scheduler는 각종 메타 정보의 기록을 담당
- DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
- DAG들의 스케줄링 관리 및 담당
- 실행 진행 상황과 결과를 DB에 저장
- Executor를 통해 실제로 스케줄링된 DAG을 실행
- Airflow에서 가장 중요한 컴포넌트
Scheduler - Executor
- Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
- Local Executor : DAG Run을 프로세스 단위로 실행
- Local Executor
- 하나의 DAG Run을 하나의 프로세스로 띄워서 실행
- 최대로 생성할 프로세스 수를 정해야 함 - Airflow를 간단하게 운영할 때 적합
- Sequential Executor
- 하나의 프로세스에서 모든 DAG Run들을 처리
- Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용
- Airflow를 테스트로 잠시 운영할 때 적합
- Local Executor
- Remote Executor: DAG Run을 외부 프로세스로 실행
- Celery Executor
- DAG Run을 Celery Worker Process로 실행
- 보통 Redis를 중간에 두고 같이 사용
- Local Executor를 사용하다가, Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
- Kubernetes Executor
- 쿠버네티스 상에서 Airflow를 운영할 때 사용
- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
- Airflow 운영 규모가 큰 팀에서 사용
- Celery Executor
- Local Executor : DAG Run을 프로세스 단위로 실행
Workers
- DAG을 실제로 실행
- Scheduler에 의해 생기고 실행
- Executor에 따라 워커의 형태가 다름
- Celery 혹은 Local Executor인 경우, Worker는 프로세스
- Kubernetes Executor인 경우, Worker는 pod
- DAG Run을 실행하는 과정에서 생긴 로그를 저장
Metadata Database
- 메타 정보를 저장
- Scheduler에 의해 Metadata가 쌓임
- 보통 MySQL이나 Postgres를 사용
- 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
- User와 Role (RBAC)에 대한 정보 저장
- Scheduler와 더불어 핵심 컴포넌트
- 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
- 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 외부 DB 인스턴스를 사용
Webserver
- WEB UI를 담당
- Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
- 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF 하며, 현 상황을 파악
- REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
- 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음(반면 Scheduler의 작동 여부는 매우 중요)