본문 바로가기

DE/Airflow

[Airflow] Apache Airflow(아파치 에어플로우) 기초, DAG, operator

1. Apache Airflow 소개

1.1 Batch Process란?

예약된 시간에 실행되는 프로세스 일회성(1회)도 가능하고, 주기적인 실행도 가능

ex. 이번 주 일요일 07:00에 1번 실행되는 프로세스 ex. 매주 일요일 07:00에 실행되는 프로세스

Batch Process를 AI 엔지니어가 알아야 하는 이유

  • 모델을 주기적으로 학습시키는 경우 사용(Continuous Training)
  • 주기적인 Batch Serving을 하는 경우 사용
  • 그 외 개발에서 필요한 배치성 작업

Batch Process - Airflow 등장 전

대표적인 Batch Process 구축 방법 → Linux Crontab

Linux Crontab의 문제

  • 재실행 및 알람 : 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음. 실패할 경우, 자동으로 몇 번 더 재실행(Retry)
  • 과거 실행 이력 및 실행 로그를 보기 어려움: 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듬 Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음

=> 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함

 

스케줄링 워크플로우 전용 도구

1.2 Airflow 소개

  • 현재 스케줄링, 워크플로우 도구의 표준
  • 에어비앤비(Airbnb)에서 개발
  • 현재 릴리즈된 버전은 2.2.0으로, 업데이트 주기가 빠름
  • 스케줄링 도구로 무거울 수 있지만, 거의 모든 기능을 제공하고, 확장성이 넓어 일반적으로 스케줄링과 파이프라인 작성 도구로 많이 사용
  • 특히 데이터 엔지니어링 팀에서 많이 사용

Airflow 기능

  • 파이썬을 사용해 스케줄링 및 파이프라인 작성
  • 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공
  • 실패 시 알람
  • 실패 시 재실행 시도
  • 동시 실행 워커 수
  • 설정 및 변수 값 분리

2. Apache Airflow 실습

2.1 DAG과 Task

  • Airflow에서는 스케줄링할 작업을 DAG이라고 부름
  • DAG은 Directed Acyclic Graph(순환하지 않는 방향이 존재하는 그래프)의 약자로, Airflow에 한정된 개념이 아닌 소프트웨어 자료구조에서 일반적으로 다루는 개념 
  • 각 DAG은 Task로 구성
  • DAG 내 Task는 순차적으로 실행되거나, 동시에(병렬로) 실행할 수 있음
    • DAG 1개 : 1개의 파이프라인
    • Task : DAG 내에서 실행할 작업
    • ex) DAG의 3가지 Task 구성 -> extract, transform, load

2.2 유용한 Operator

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

 

airflow.operators — Airflow Documentation

 

airflow.apache.org

airflow.operators.python.PythonOperator

  • 파이썬 함수를 실행
  • 함수 뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행할 수 있음
  • 실행할 파이썬 로직을 함수로 생성한 후, PythonOperator로 실행

airflow.operators.bash.BashOperator

  • Bash 커맨드를 실행
  • 실행해야 할 프로세스가 파이썬이 아닌 경우에도 BashOperator로 실행 가능
  • ex. shell 스크립트, scala 파일 등

airflow.operators.empty.EmptyOperator

  • 아무것도 실행하지 않음
  • DAG 내에서 Task를 구성할 때, 여러 개의 Task의 SUCCESS를 기다려야 하는 복잡한 Task 구성에서 사용

airflow.providers.http.operators.http.SimpleHttpOperator

  • 특정 호스트로 HTTP 요청을 보내고 Response를 반환
  • 파이썬 함수에서 requests 모듈을 사용한 뒤 PythonOperator로 실행시켜도 무방
  • 다만 이런 기능이 Airflow Operator에 이미 존재하는 것을 알면 좋음

클라우드의 기능을 추상화한 Operator도 존재(AWS, GCP 등)

 

3. Apache Airflow 아키텍처

DAG Directory

  • DAG 파일들을 저장
  • 기본 경로는 $AIRFLOW_HOME/dags
  • DAG_FOLDER 라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
  • Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱

Scheduler

  • Scheduler는 각종 메타 정보의 기록을 담당
  • DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
  • DAG들의 스케줄링 관리 및 담당
  • 실행 진행 상황과 결과를 DB에 저장
  • Executor를 통해 실제로 스케줄링된 DAG을 실행
  • Airflow에서 가장 중요한 컴포넌트

Scheduler - Executor

  • Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
    • Local Executor : DAG Run을 프로세스 단위로 실행
      • Local Executor
        • 하나의 DAG Run을 하나의 프로세스로 띄워서 실행
        • 최대로 생성할 프로세스 수를 정해야 함 - Airflow를 간단하게 운영할 때 적합
      • Sequential Executor
        • 하나의 프로세스에서 모든 DAG Run들을 처리
        • Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용
        • Airflow를 테스트로 잠시 운영할 때 적합
    • Remote Executor: DAG Run을 외부 프로세스로 실행
      • Celery Executor
        • DAG Run을 Celery Worker Process로 실행
        • 보통 Redis를 중간에 두고 같이 사용
        • Local Executor를 사용하다가, Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
      • Kubernetes Executor
        • 쿠버네티스 상에서 Airflow를 운영할 때 사용
        • DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
        • Airflow 운영 규모가 큰 팀에서 사용

Workers

  • DAG을 실제로 실행
  • Scheduler에 의해 생기고 실행
  • Executor에 따라 워커의 형태가 다름
  • Celery 혹은 Local Executor인 경우, Worker는 프로세스
  • Kubernetes Executor인 경우, Worker는 pod
  • DAG Run을 실행하는 과정에서 생긴 로그를 저장

Metadata Database

  • 메타 정보를 저장
  • Scheduler에 의해 Metadata가 쌓임
  • 보통 MySQL이나 Postgres를 사용
  • 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
  • User와 Role (RBAC)에 대한 정보 저장
  • Scheduler와 더불어 핵심 컴포넌트
  • 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
  • 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 외부 DB 인스턴스를 사용

Webserver

  • WEB UI를 담당
  • Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
  • 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF 하며, 현 상황을 파악
  • REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
  • 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음(반면 Scheduler의 작동 여부는 매우 중요)