What is Apache Airflow? How is it most commonly used?

에어플로우는 Python 을 사용하여 정해진 시간/조건에 원하는 작업을 실행하도록 만들어주는 ETL 오케스트레이션 도구
에어플로우를 통해 어떤 작업이 언제 실행되는지 설정하여 데이터 파이프라인을 정의 할 수 있고
작업의 경과를 WebUI 로 모니터링 할 수 있어서
주기적으로 실행되어야 하는 작업들을 자동화하고 편하게 관리할 수 있음


Airflow is an ETL orchestration tool that uses Python. It triggers jobs at specific points in time.
We can monitor the process of the task via the WebUI.



 

What is a DAG and DagRun?

DAG 란 작업과 그 작업들 간의 관계를 나타내는 그래프임
명확한 시작점과 끝점이 있으며, 작업들 사이에 "순환"이 없음

airflow 에서 DAG 란 비순환 그래프를 이루는 데이터 파이프라인으로 이해할 수 있음
DAG 내에 여러 operators 를 설정하고 트리거하여 작업을 실행함
스케줄에 따라 무엇을 어떻게 실행할지 정의한 설계도
DAG 자체는 실행되지 않음. 그냥 스크립트임

DagRun 은 DAG 를 실제로 실행한 단위
(DagRun represents one execution of DAG)

DAG 가 여러번 실행되면 여러 DagRun 이 생성됨
각 DagRun 은 하나의 logical_date 를 갖음



 

What are the three parameters needed to define a DAG?

DAG 를 정의하기 위해 필요한 세 가지는 start_date, dag_id, 그리고 schedule

ID : DAG를 고유하게 식별하는 아이디이며 일반적으로 짧은 문자열로 설정함
start_date : DAG가 트리거되는 첫 번째 간격의 날짜 및 시간

schedule : DAG가 실행될 빈도. 매주, 매일, 매시간 또는 사용자 지정 값으로 설정. cron 표현식이나 @daily 등으로 설정 가능



ID is a unique value used to identify DagRun. It is typically composed of a short string.
start_date is the date and time of the first interval when the DAG is triggered.
schedule refers to the frequency at which the DAG will run.



 

What is an Airflow task and operator?
Whats the difference between them?

operatorDAG 에서 작업을 정의하는 템플릿
각 operator 마다 하나의 고유 작업을 설정할 수 있음
이를 테면, PythonOperator 는 python 코드를 실행하고
SparkSubmitOperator 는 spark 에 job 을 제출할 수 있음

Operator 가 작업을 정의한 템플릿이라면,
task 는 Operator 를 인스턴스화한 실행 단위임 (execution unit)
task_a = PythonOperator(...) 여기서 task_a 가 바로 task 임 (아직 실행된 거 아님)
마치 OOP 언어의 object 와 instance 의 관계같음

task instance 는 task 가 실행되면 생기는 실행 단위
예를 들어, 어떤 DAG 안에 3개의 Operator 가 존재하고 각각 task_a, task_b, task_c 를 설정했다고 하자
Scheduler 가 해당 DAG 를 실행하면 세 개의 task 들이 저마다 Task Instance 를 만듦
task instance 에는 메타데이터가 포함되어있음
(언제 실행되었고, task_id 는 무엇이고, logical_date 는 몇이고 누가 실행했고 어떻게 실행되었고... 등)
해당 정보는 Metastore 에 저장되며 WebUI 에서 task 상태 확인 가능



A task instance is an execution unit that is created when a task is executed.




from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def hello():
    print("hello")

with DAG(
    dag_id="example_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=True
) as dag:

    t1 = PythonOperator(
        task_id="print_hello",
        python_callable=hello
    )


위와 같은 DAG 가 존재하고 이를 실행한다고 하자

Operator : 무슨 작업을 할 지 정의한 템플릿. 위 예제에서는 PythonOperator
task : Operator 의 인스턴스. 위 예제의 t1. 아직 실행된 거 아님

25년 6월 25일에 실행하면

DagRun : DAG 를 특정 시점(logical_date)에 실행한 기록. DAG 실행 1회를 의미함
DagRun 은 dag_id, logical_date(2025.06.24) 등을 갖음

task instance : task(위 예제의 t1)를 실행한 인스턴스. Operator 가 아닌 DAG 도 아닌 task 를 실행한 것임
이렇게 생성된 task instance 를  executor 가 worker 에게 전달하고
worker 들이 task instance 를 실행하게 됨
그리고 worker 가 실행한 결과는 task instance logic 에 의해 DB 에 저장됨









 

What are the core components of Airflow’s architecture?

Scheduler : 사용자가 정의한 DAG 를 주기마다 파싱, 분석한 'DAG 상태' 및 'DAG 메타데이터'를 metastore 에 넣음
(DAG 파일 자체는 .py 파일 그대로 존재함. metastore 에 저장되는 게 아님)
트리거 될 DAG 들을 확인하고 실행 가능하겠다 싶으면 실행하여 DagRun, task_instance 를 만든 후 Executor 에게 넘겨줌

Executor : Scheduler 가 작업을 실행하기 위한 전략
Scheduler 로부터 받은 실행 가능한 task_instance 들을 queue 에 넣고, worker 에 나눠져서 실행될 수 있도록 함
Local Executor, Celery Executor 등 어떤 방식으로 작업을 실행할지 결정함

WebServer : 사용자가 Airflow 와 소통하는 창구
설정된 값, DAG, task 의 상태를 모니터링 할 수 있고, 수동으로 DAG 를 trigger 할 수 있음

Metastore : airflow 의 DAG 및 task 상태 정보를 저장하는 저장소.
모든 컴포넌트가 DB 를 통해 상태 정보를 공유하기 때문에, single source of truth 라고 불림

worker 들은 queue 로부터 task instance 를 가져가서 실행함
내부 task instance login 에 의해, 실행한 task instance 의 상태나 결과가 metastore 에 기록됨
그래서 사용자가 WebUI 를 통해 확인 가능
(스케줄러가 worker 들의 결과 받아서 DB 에 저장하는 거 아님)


 

What are Task Groups? How are they used within DAGs?

task group 은 tasks 를 논리적으로 함께 묶어주는 역할을 함
사용자 입장에서 task 를 보기 편하게 구분할 수 있도록 도와줌
성격별로, 역할별로 task 를 그룹지어두면 이해하거나 설명할 때 편함



 

Given a data_interval_start and data_interval_end, when is a DAG executed?

data_interval_start 는 DAG 에 설정된 실행 주기(interval) 의 시작점이고
data_interval_end 는 DAG 에 설정된 실행 주기(interval) 의 끝점임
예를 들어, 매일 새벽 2시에 트리거되도록 스케줄링 된 DAG 의 interval 은 하루이며
2025년 6월 25일 새벽 2시에 트리거되었을 때
data_interval_start 는 25년 6월 24일 오전 2시,
data_interval_end 는 25년 6월 25일 오전 2시


 

What is the catchup parameter, and how does it impact the execution of an Airflow DAG?

catchup 파라미터를 true 로 설정하면, start_date 부터 logical_date 까지 scheduling 에 맞춰 실행됨(DagRun 생성)
만약 catchup 이 false 라면, DAG 를 active 했을 때 과거에 실행되지 않았던 날짜들은 무시하고, active 한 이후로만 트리거됨

마지막으로 실행한 날짜부터 오늘까지 아님. start_date 부터 오늘까지

The catchup parameter controls whether Airflow creates DAG runs for past scheduled intervals that were missed.

If catchup=True, when a DAG is enabled, the scheduler will create DagRuns for all past execution dates starting from the start_date up to the current date, based on the DAG’s schedule.

If catchup=False, the scheduler skips all past intervals and only creates a DagRun for the next upcoming schedule, meaning historical runs are not backfilled automatically.


This parameter is commonly set to False for streaming pipelines or non-idempotent jobs, and set to True when historical data processing is required.



 

What are XComs, and how are they typically used?

xcom 은 같은 DAG 내의 여러 tasks 간에 소량의 데이터를 공유하기 위한 기능
PythonOperator, BashOperator, Sensor... 등 다양한 operator 들을 통해 데이터를 공유할 수 있음

xcom 데이터는 key-value 형태로 metastore  에 저장됨
webui 에서 저장 가능

xcom 은 기본적으로 json-serializeable 한 값만 저장하는 것이 권장됨
대량의 데이터를 공유 할 때는 외부 storage 를 사용


 

What is idempotency? 
Why is this important to keep in mind when building Airflow DAGs?

멱등성이란, 여러번 실행해도 동일한 결과를 만드는 것을 말 함
airflow 에서 DAG 는 멱등성을 갖춰 작성되어야 함
airflow 작업이 실패하여 재실행 하거나, 두 번 이상 실행 될 수 있기 때문

catchup, backfill  등으로 과거 날짜 DagRun 을 다시 실행하게 되는 경우에도
동일한 결과를 출력해야 하기 때문에 멱등성을 중요함 
멱등성이 지켜지지 않으면 중복데이터 발생 가능



 

After writing a DAG, how can you test that DAG? Walk through the process from start to finish.

- pytest, unittest 등으로 PythonOperator 로직 테스트
- 각 task 가 정상 동작하는지 CLI 로 검사 : airflow tasks test my_dag my_task_id 2025-06-25
  scheduler, executor 없이 로컬에서 task 만 실행 가능함
- dag 가 정상 동작하는지 CLI 로 검사 : airflow dags test my_dag 2025-06-25
  dag 전체를 순서대로 실행 가능함
- test 환경의 airflow 에서 dag 를 실행해 봄


airflow tasks test CLI 로 테스트하면
scheduler, executor, worker 등이 관여하지 않음
의존성 상관 없이, 해당 task 하나만 로컬에서 (실제로) 실행해서 로직 이상 여부를 확인

airflow dags test CLI 로 테스트하면
scheduler, executor, worker 등이 관여하지 않음
dag 전체 구조 올바른지, task 의존성이 올바른지 확인



 

To manage credentials for connecting to tools such as databases, APIs, and SFTP sites, 
what functionality does Airflow provide?

'connections' 란, 외부 시스템에 접근하기 위한 접속 정보를 관리하는 기능
DB, APIs, S3, RestAPI, SFTP, Kafka 등

connections 에 접속 정보를 넣어두면, DAG 코드 안에서 password 등을 직접 넣지 않아도 됨

WebUI 를 통해 connections 관리 가능하며
connections 에 저장된 접속 정보는 metastore 에 저장됨

conn_id : 컨넥션 이름
conn_type : mysql, sftp, http 등
host : 서버 주소
schema : DB name, path 등
login : username
passowrd : password
port : port
extra : json 형태의 추가 설정


connections 에 저장된 정보를 사용 할 때는,
아래처럼 Hook 을 사용하여 conn_id 와 연결해주면 끝!

from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.operators.python import PythonOperator

def download_file():
    hook = SFTPHook(ftp_conn_id="my_sftp_conn")
    hook.retrieve_file(
        remote_full_path="/remote/data.csv",
        local_full_path="/tmp/data.csv"
    )

task = PythonOperator(
    task_id="download_from_sftp",
    python_callable=download_file
)


다양한 Hook 이 있음
SFTPHook, MySqlHook, S3Hook....


추가로, connections 는 환경별로 다르게 관리 가능함
이를테면, dev, stage, prod 별로 다르게 설정 가능!





Airflow 에는 'Variables' 라는 기능도 있음

DAG, task 에서 참조할 수 있는 '전역 설정 값(key-value)' 저장소임
문자열, 숫자, json 등 저장 가능함.
Key-value 형태로 저장

connections 처럼 variables 도 WebUI 에서 설정하며,
환경별(dev, stage, prod)로 다르게 관리 가능함

주로 S3 버킷이름, 임계값(threshold), 파일 경로 등 고정적인 정보 저장용이며
비밀번호나 key 등의 민감한 저장을 위한 용도로 사용하지 않음

아래처럼 variable 을 불러와서 사용

from airflow.models import Variable

bucket = Variable.get("data_bucket")

def process():
    print(f"Reading data from {bucket}")

PythonOperator(
    task_id="process_data",
    python_callable=process
)



 

Your team is currently supporting a legacy data pipeline that leverages homegrown tooling. 
You’re tasked with migrating this pipeline to Airflow. How would you approach this?

- legacy data pipeline 은 블랙박스같은 느낌이 있기 때문에 조심해서 migration 해야 함. migration 하다가 로직이 달라질 수 있기 때문
- homegrown tooling 에 대한 코드, 문서, 기획서, 참고자료 등이 있는지 찾아봄
- source 와 target 을 구체적으로 정리
- 해당 파이프라인이 다른 외부 서비스를 사용하는지도 체크
- 내부 코드를 하나하나 검토하여 어떤 로직으로 이루어졌는지 확인하고 airflow 에서 구현 진행
- 모듈 별로 migration 진행
- source, target 과 연결시 필요한 인증 정보 등을 새로 만들어서 연결
- migration 진행중에 airflow 에서 제공하는 더 나은 기능(retry, alarm 등)이 있다면 추가
- migration 진행 과정을 문서로 정확하게 남겨서 추후 이슈 발생시 참고할 수 있도록 함
- 실제 두 파이프라인의 결과를 비교 테스트해봄. 한 달 정도 실제 데이터 기반으로 테스트해보고 , 잘 migration 되었다면 legacy 를 없애는 방향으로 진행


shadow run 이란?
feature flag, dual write 란?



 

Outside of traditional Data Engineering workflows, 
what are other ways that Apache Airflow is being used by data teams?
 







참고

 

https://www.datacamp.com/blog/top-airflow-interview-questions

 

+ Recent posts