2.


airflow 의 장점
- 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG 로 만듦
- backfill 가능
- 원하는대로 스케줄링 가능

dag_id 는 DAG 를 나타내는 유니크한 값

각 오퍼레이터 (bashoperator, pythonoperator....) 는 하나의 태스크 수행하고
여러 오퍼레이터가 모여 DAG 를 구성

task 와 operator 차이점
operator 는 단일 작업을 수행하는 역할
taskoperator 의 상태를 관리하고 사용자에게 시작/완료 등의 상태 변경을 표시하는 Airflow 의 내장 컴포넌트

Operator : Airflow에서 수행될 작업의 템플릿 또는 추상적인 정의
즉, "어떤 작업을 할 것인가?"를 정의하는 코드 조각
Operator 자체는 직접 실행되지 않으며, Task에 의해 인스턴스화되어 실행됨

Task : DAG 내에서 실제로 실행되는 작업의 특정 인스턴스
즉, "어떤 Operator를 사용하여, 어떤 설정으로, 언제 실행할 것인가?"를 정의


Task는 Operator를 사용하여 작업을 정의함. 즉, Task는 Operator가 실행된 인스턴스
task 는 DAG 내에서 유일한 ID를 갖음



Airflow WebUI 화면에서
성공한 태스크는 초록색
실패한 태스크는 빨간색
앞 태스크의 성공을 기다리고 있는 태스크는 주황색으로 표시

실패한 태스크는 Clear 버튼으로 실패한 부분부터 다시 실행 가능


 

 

3.


종료 날짜가 없다면 DAG 의 스케줄링이 계속 진행됨

스케줄링(schedule_interval)은 "@daily" 처럼 넣거나 "0 2 * * *" 처럼 넣을 수 있음

또한 3일마다 (dt.timedelta(days=3)), 2시간마다(dt.timedelta(hours=2)) 처럼
crontab 에서 못 하는 간격 (빈도) 스케줄링을 Airflow 는 할 수 있음



증분 방식이란, 전체 데이터를 가져오기 보다는 내가 원하는 날짜의 데이터만 가져와서 처리하는 것
예를 들어 openAPI 를 그냥 사용하면 30일치 데이터를 들고오지만
정해진 날짜 파라미터에 값을 넣으면 해당 날짜 데이터만 가져옴
이렇게 매일 다른 날짜 처리를 하며 데이터를 쌓아나가는 것이 증분 방식

이를 위해 동적으로 바뀌는 날짜 값을 알아야 함
execution_date 가 바로 그것임

execution_date 는 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프임
헷갈리기 때문에 execution_date 는 deprecated 되고 대신 다른 값이 생김
실제로 실행하는 날짜 'data_interval_end' 가 됨


기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end
tomorrow_ds deprecate
tomorrow_ds_nodash deprecate
yesterday_ds deprecate
yesterday_ds_nodash deprecate
prev_execution_date prev_logical_date
next_execution_date next_logical_date

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/


백필은 catchup 이 true 일 때,
시작 시간부터 현재 시간까지의 모든 과거 스케줄 간격마다 모든 작업을 수행하는 기능

Airflow 태스크는 원자성과 멱등성을 갖어야 함
- 원자성 : task 가 성공적으로 수행하거나, 실패시 시스템에 영향이 없는 상태에서 실패해야 함
  예를 들어 중간 결과물을 만드는 태스크가 쓰는 도중에 실패할 경우, 뒤의 태스크가 결과물을 이용할 수 있는 상황이 발생
- 멱등성 : 여러번 실행해도 같은 결과를 도출해야 함
  예를 들어 결과값에 append 를 하는 태스크는 여러번 실행하면 결과값이 계속 늘어날 것임

 

 

4.


jinja 템플릿은 런타임 시에 사용 가능한 문자열 변수
다음과 같이 중괄호 두 개로 사용 가능

BashOperator(
  task_id="test",
  bash_command=(
    "echo {{ execution_date.year }}"
    "echo {{ '{:02}'.format(execution_date.month) }}"
  )
)

jinja 템플릿에 사용 가능한 변수들은 모두 태스크 콘텍스트에서 살펴볼 수 있음
operator 실행시 kwargs 를 통해 태스크 컨텍스트 변수들이 전달되어 사용 가능

op_kwargs는 dict
op_args는 list


DummyOperator 는 EmptyOperator 로 바뀌었음
아무 작업도 하지 않으며, 여러 다른 작업들을 그룹화(fan-out) 하는데 사용

mock 테스트 용도로 사용됨
테스트시 EmptyOperator 를 사용하고, 나중에 다른 Operator 로 변경하는 식

BranchPythonOperator 에서 일부 작업을 건너 뛰고 싶을 때 EmptyOperator 를 대신 사용
(빈 경로를 가질 수 없기 때문)

참고 https://stackoverflow.com/a/57486645



태스크간 작은 데이터를 전달할 때 xcom 을 사용 가능(메타스토어에 저장되고, 메타스토어마다 저장 가능 크기가 다름)
태스크간 큰 데이터를 전달할 때, 모든 태스크가 접근 가능한 스토리지 사용하여 전달

하나의 Task 가 xcom(피클링)을 통해 데이터를 메타스토어에 저장하고,
다른 Task 에서 xcom 이 저장한 데이터를 메타스토어로부터 읽는 것으로 데이터를 교환함

피클 Picke 이란, Python 객체를 디스크에 저장할 수 있게 하는 직렬화 프로토콜
피클링된 객체는 메타스토어의 블록 blob 에 저장되기 때문에
일반적으로 문자열 몇 개와 같은 작은 데이터 전송에만 적용 가능

Task 간 좀 더 큰 데이터를 교환 할 때는 xcom 이 아닌 외부 데이터베이스를 사용해야 함

xcom 혹은 스토리지로 데이터 전달시,

dag 에 나타나지 않는 의존관계
를 만듦
예를 들어 A태스크에서 토큰을 받아 xcom 에 넣었고 B태스크에서 꺼내 쓰는 경우
A와 B 사이에 의존 관계가 형성됨

원자성을 무너뜨릴 수 있음
예를 들어 B태스크에서 사용하려고 토큰을 꺼내보니 기간만료라서 사용 불가한 경우

xcom 에 저장되는 데이터는 반드시 직렬화가 가능해야 함
예를 들어 파이썬 람다는 xcom 에 저장 불가

xcom 을 직접 사용하는 대신
taskflow API 를 사용하여 데이터를 넘길 수 있음
마치 python 함수 파라미터로 데이터를 받는 느낌


 

5.


fan-out 구조 의존성

task_start = EmptyOperator(task_id="start")
task_start >> [task_A, task_B]

fan-in 구조 의존성

[task_A, task_B] >> task_finish



코드 내용에 따라 의존성 결정하기 위해 BranchPythonOperator 사용

    def choose_branch(**context):
        if context["execution_date"] == THE_DATE:
            return "b1"
        if context["execution_date"] > THE_DATE:
            return "b2"
        if context["execution_date"] < THE_DATE:
            return "b3"

    branching = BranchPythonOperator(task_id='choose_branch', python_callable=choose_branch)
    b1 = BashOperator(task_id='b1', bash_command='echo b1')
    b2 = BashOperator(task_id='b2', bash_command='echo b2')
    b3 = BashOperator(task_id='b3', bash_command='echo b3')
    c1 = BashOperator(task_id='c1', bash_command='echo c1')

    start_dag >> branching >> [b1, b2, b3]
    b1 >> c1


참고 https://wefree.tistory.com/38



트리거 규칙은 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수 조건

all_success (default): All upstream tasks have succeeded
all_failed: All upstream tasks are in a failed or upstream_failed state
all_done: All upstream tasks are done with their execution
one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
always: No dependencies at all, run this task at any time

 

6.


센서는 특정 조건이 true 인지 지속적으로 확인하고
조건이 true 라면 지정된 작업을 수행

센서는 상태가 true 가 될 때까지 혹은 타임아웃이 될 때까지 계속 확인 진행
true가 되거나 타임아웃이 지나면 확인을 멈춤


filesensor 의 경우 wildcard 를 사용해 파일을 감시하는 것이 가능함

파일이 writing 되는 도중에 filesensor 가 감지하고 그 불완전한 파일을 사용할 수 있기 때문에
파일이 성공적으로 쓰여지면 파일 이름에 success 등의 태그를 붙임
filesensor 가 success 태그가 붙는 파일만 감지하도록 만듦
이 방법은 filesensor 의 오용을 피하는 방법도 되지만,
Operator 의 원자성을 지키기 위한 방법도 됨(불완전한 파일이 시스템에 영향을 주지 않도록)

filesensor 대신 pythonsensor 사용도 가능
python 코드를 이용해서 특정 path 에 특정 파일이 저장되었는지 확인하는 센서이며
조건이 true 라면 true 를 return 함



센서는 (기본) 7일을 넘기면 timeout 이 나면서 실패하게 됨

매일 센서가 스케줄링되면, 7일동안 7개의 센서가 실행되게 되므로
리소스를 잡아먹으며, 새롭게 실행되어야 할 태스크가 실행되지 않는(센서 데드락) 원인이 됨
따라서 센서를 실행할 때는 reschedule 모드를 설정
센서 reschedule 모드 poke 동작을 실행할 때만 태스크 슬롯을 차지하며,
대기 시간 동안은 슬롯을 차지하지 않음

태스크 실행 수 제한 참고 : https://eyeballs.tistory.com/534


< poke 모드 >
Sensor 태스크는 주기적으로 (일정 간격, poke_interval) 외부 시스템의 조건을 확인하는 poke() 메서드를 실행
poke() 메서드가 True를 반환하면 센서의 대기 상태가 종료되고 다음 태스크로 진행
poke() 메서드가 False를 반환하면 센서는 poke_interval 동안 잠시 대기한 후 다시 poke() 함수를 호출하여 조건을 재확인
이것을 폴링(polling) 방식이라고 하며, True 성공하거나 timeout 으로 설정된 시간 초과 할 때까지 계속됨
대기하는 동안 (poke_interval 동안)에도 센서 태스크는 계속 실행 상태를 유지, 주기적으로 외부 시스템에 연결하여 상태를 확인
따라서 대기 시간이 길어질수록 불필요한 리소스(CPU, 네트워크 연결 등) 소비
많은 수의 센서가 poke 모드로 긴 시간 동안 대기하면, Airflow 스케줄러에 부담
왜냐면 각 센서 태스크의 상태를 지속적으로 확인하고 관리해야해서
하지만 센서 확인이 빠르게 처리된다는 장점이 있음

    poke_sensor = ExternalTaskSensor(
        task_id='poke_mode_sensor',
        external_dag_id='upstream_dag',
        external_task_id='success_task',
        poke_interval=60,  # 60초마다 poke() 호출
        timeout=3600,
        mode='poke',  # 명시적으로 poke 모드 설정 (기본값)
    )


< reschedule 모드 >
Sensor 태스크의 poke() 메서드가 False를 반환하면, 센서는 실행 중인 태스크 슬롯을 해제
그리고 스케줄러에게 "나중에 다시 실행하라(reschedule)"고 요청
스케줄러는 센서 태스크를 다시 스케줄링하여 나중에 (일정 시간 후 또는 다음 스케줄링 주기) 다시 실행을 시도함
poke() 메서드가 True를 반환하면 센서의 대기 상태가 종료되고 다음 태스크로 진행(poke 모드와 동일)
timeout 파라미터는 reschedule 모드에서도 적용되어, 센서가 지정된 시간 동안 성공하지 못하면 실패 처리됨
센서가 대기하는 동안 태스크 슬롯을 점유하지 않고 스케줄러에 부하가 없다는 장점
하지만 스케줄러가 다시 실행해야하니, 센서 확인이 느린 편


    reschedule_sensor = ExternalTaskSensor(
        task_id='reschedule_mode_sensor',
        external_dag_id='long_running_dag',
        external_task_id='final_task',
        poke_interval=300,  # 5분마다 poke() 호출 (재스케줄링 시점과 다를 수 있음)
        timeout=7200,
        mode='reschedule',  # reschedule 모드 설정
    )



TriggerDagRunOperator 를 사용하여 Dag1 의 태스크에서 Dag2 를 실행할 수 있음
트리거되는 Dag2 는 schedule_interval 이 필요하지 않아서 None 값을 갖음

백필에 의해 Dag1 이 동작하면 이전에 실행된 task 상태는 삭제되고 새로운 태스크가 실행됨
이에 따라 Dag2 가 트리거되어 실행되는데,
트리거되는 Dag2 에서는 이전에 실행된 태스크가 삭제되지 않고 새로운 태스크가 생성

dag 간 fan-in 은 TriggerDagRunOperator 를 통해 구현할 수 없음
예를 들어, dag1, dag2 가 모두 완료되어야 dag3 이 구동되도록 하는 fan-in 은 TriggerDagRunOperator 를 통해 구현이 불가능
이런 경우엔 dag3 에 "dag1이 마무리되는지 확인하는 센서"와 "dag2가 마무리되는지 확인하는 센서"를 두고
두 센서를 기준으로 fan-in ( [ s1, s2 ] >> o ) 으로 연결하면 됨



dag 실행시 dag_run 에 run_id 값이 뜸
run_id 값을 보고 해당 dag 가 어떻게 실행되었는지 알 수 있음

schedule__ : dag가 스케줄되어 실행됨
backfill__ : dag 가 백필에 의해 실행됨
manual__ : dag 가 수동으로 실행됨



Airflow 는 DAG 간 의존성을 관리하는 방법을 제공하지 않음
예를 들어 dag1, dag2 가 모두 성공하면 dag3 가 트리거되어 실행되는 경우
[dag1,dag2] >> [dag3] 처럼 의존성을 관리하는 방법은 없음

다른 방법으로 의존성을 만들 수 있음
dag3 에서 ExternalTaskSensor 를 사용하면 됨
ExternalTaskSensor 는 dag1 과 dag2 의 태스크를 살펴보다가, 둘 다 성공하게 되면 dag3 의 태스크를 실행함

이 방법을 사용하기 위해서는
dag3 의 ExternalTaskSensor 실행 날짜와
dag1, dag2 의 태스크 실행 날짜가 완벽하게 동일해야 함
그렇지 않으면 ExternalTaskSensor 가 dag1,2 의 태스크를 찾지 못 함

실행 날짜가 다르다면, 다른 만큼 차이값을 ExternalTaskSensor 에 미리 넣어주어야 함
만약 dag1,2 태스크 실행 시간이 매일 10시이고
dag3 ExternalTaskSensor 실행 시간이 매일 12시 라면,
2시간 차이가 난다고 ExternalTaskSensor 에 미리 알려주어 실행 날짜를 맞춤




TriggerDagRunOperator 을 이용하지 않고,
다른 방법으로 dag 를 트리거 할 수 있음
바로 CLI 와 RestAPI
이 두 가지 방법으로 dag 를 트리거하면, 트리거 하는 순간에만 dag 가 실행되고
dag가 꾸준히 스케줄링되지는 않음

다음과 같은 CLI 명령으로 dag1 대그 실행 가능

airflow dags trigger dag1
airflow dags trigger --conf '{"age": 12, "sex": "M"}' dag1

RestAPI 도 사용 가능


 

 

 

9.


테스트에 대한 내용
다시 보자


 

 

11.

모범 사례
마지막에 볼 것

 

12.


Airflow 아키텍처는 최소 세 가지 요소를 갖추고 있음

- 웹서버 : Airflow 의 프로세스
  파이프라인 상태정보 시각화
  사용자가 dag 수행 및 관리
  메타스토어의 직렬화된 dags 를 읽고 사용

- 스케줄러 : Airflow 의 프로세스
  dags 가 있는 dir 에 접근 가능해야 함
  dags 를 읽고 구문 분석 후 메타데이터에 저장
  실행할 태스크 결정하고 대기열에 배치

- 메타스토어(데이터베이스) : 메타데이터 저장
  직렬화된 dags 를 저장하고 있음



4 가지 익스큐터 유형

- SequenctialExecutor
  분산 환경에서 사용 불가능
  테스트용

- LocalExecutor
  분산 환경에서 사용 불가능
  단일 호스트용

- CeleryExecutor
  분산 환경에서 사용 가능
  멀티 호스트용

-KubernetesExecutor
  분산 환경에서 사용 불가능
  쿠버네티스 기반 컨테이너용



LocalExecutor 는 단일 머신에서 태스크 병렬 실행이 가능
기본적으로 워커 프로세스는 최대 32개 병렬 실행 가능
즉, 단일 머신에서 여러 워커가 동작하며 병렬 실행

CeleryExecutor 는 내부적으로 Celery 를 이용하여 실행할 태스크들에 대해 대기열을 등록
이후, 워커가 대기열에 등록된 태스크를 읽고 개별적으로 처리
즉, 다중 머신에서 여러 워커가 동작하며 분산 병렬 실행
대기열 메커니즘을 위해 redis, rabbitmq 등의 인메모리 데이터베이스 사용

KubernetesExecutor 는 쿠버네티스에서 워크로드 실행
airflow dag 의 각 태스크는 독립된 쿠버네티스 파드에서 실행

[그림 설명 링크]



airflow 에서 일어나는 모든 일은 메타스토어에 저장됨
airflow 는 SQLAlchemy 를 이용하여 외부 Database 에 접근 및 태스크를 수행하여 메타스토어로 사용
메타스토어는 PostgreSQL 이나 MySQL 을 권장. SQLite 는 동시 쓰기가 지원되지 않음



스케줄러를 자세히 알아봄

스케줄러 역할은 다음과 같음
- dag 파일 구문 분석 후, 추출된 정보는 메타스토어에 저장
- 실행할 준비가 된 태스크를 결정하고 이를 대기 상태로 전환
- 대기 상태의 태스크 가져와서 실행(익스큐터에게 넘기나 봄)

스케줄러는 dag 디렉터리를 주기적(while loop)으로 확인하고 dags 파일 처리
dags 파일이 변경되지 않아도 주기적으로 확인
변경된 dags 이 있으면 구문분석 후 메타스토어에 저장

스케줄러가 dags 읽고 처리하는 데 CPU 가 소모됨
처리 주기를 줄이면 CPU 가 더 소모됨
처리 주기, 구문 분석에 사용될 프로세스 수 변경 등 옵션이 존재함

스케줄러는 실행할 태스크 인스턴스를 결정하는 역할도 함
각 태스크 인스턴스에 대해 모든 태스크가 종속성이 충족되는지,
모든 태스크가 정상적으로 마지막 단계까지 진행되었는지 확인 후
다음 순서로 실행되어야 하는 태스크들을 예약

실행이 예약된 태스크들은
해당 태스크에 쓸 수 있는 슬롯에 여유가 있을 때 (실행을 위해) 대기열에 넣어짐
태스크 인스턴스가 대기열에 배치되면 더 이상 스케줄러의 통제를 받지 않음
스케줄러의 역할은 실행 가능한 태스크 확인 후 예약, 그리고 대기열에 넣어주는 것 까지

태스크 익스큐터 프로세스가 그 다음 작업을 진행함
익스큐터(Local, Celery 등) 는 대기열에 태스크가 올 때 까지 기다림
익스큐터가
대기열에서 태스크 인스턴스 읽고, 워커에서 태스크 실행
익스큐터에서 태스크를 실행한다는 것은,
익스큐터에서 실행중인 태스크가 실패해도 airflow 자체에 영향이 가지 않도록 태스크를 실행할 새 프로세스를 만드는 것을 의미함

태스크의 상태는 (메타스토어에 저장되는) 하트비트로 알 수 있음



익스큐터는 스케줄러의 일부분
하나의 airflow 환경에서 사용 가능한 익스큐터 종류는 단 하나
태스크가 '실행 중' 상태로 바뀌었다는 말은, 익스큐터가 해당 태스크를 담당하여 실행하고 있다는 말




dag 의 task 실행이 실패하면
on_failure_callback 에 등록한 함수를 실행하도록 할 수 있음
여기 등록된 함수 내에서 slack 이나 email 로 메세지를 보내도록 하면 됨

성공시 콜백되는 함수를 등록하는 on_success_callback
태스크 재시도시 콜백되는 함수를 등록하는 on_retry_callback 도 존재함



 

 

 

 

 


DagRun 
- Task 인스턴스들을 DAG에 정의 된 특정 execution_date 에 실행하는 DAG의 인스턴스
  단순하게 표현하면 DAG의 실행 이력을 의미
  실행 중이거나 종료된 상태를 의미
- 원자성을 갖음 : 다른 리소스가 필요하지 않음
- 멱등성을 갖음 : 반복해도 동일한 결과를 보여줌

 

두번째, 세번째, 네번째 dagrun 이 실행하지 못하다가 5일날 모두 catchup 되었다면,

execution_date 는 원래 실행되었어야 했던 그 날의 execution_date 를 갖고

start_date 는 catchup 한 날짜(5일)이 됨

 

 

Airflow 3버전 의 핵심 구성 요소
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html#architecture-overview

< A scheduler >

- 스케줄링 된 dag 를 트리거(실행)하는 역할
- 실행된 task 를 executor 에 전달하는 역할
- (개발자가 작성한) DAG를 (1분에 한 번씩) 파싱/직렬화 후 metadata에 넣고, 스케줄에 따라 DAG Run을 생성하며, 태스크의 의존성을 확인하여 실행 가능한 태스크를 Executor에 전달
- executor 는 scheduler 내에 존재하기 때문에, executor = scheduler 라고 이해해도 무방함


< An executor >

- executor 는 독립적인 구성 요소가 아니라, scheduler 프로세스에 포함되어 동작하는 녀석임 (태스크를 실행하는 방식을 정의하는 인터페이스)
- executor 는 task 가 실행되는 매커니즘을 정의함. task 가 어떻게 실행될지 그리고 어떤 시스템 위에서 실행될지(kube 에서 실행될지, celery에서 실행될지, local machine 에서 실행될지) 결정함
- executor 는 task 를 Queue에 넣음. 이 queue 에 들어간 tasks 는 나중에 worker 가 읽어서 실행함
  (이름이 헷갈리지만, executor는 task 를 직접적으로 실행하는 녀석이 아님. 단지 task 를 어떻게 실행할지 정의하는 녀석임)
- executor 는 설정된 방식에 따라 태스크를 수행하도록 worker 에 task 를 (queue를 통해서) 전달하고,
  worker 가 task 를 수행한(수행하는) 결과를 metastore 에 저장함 (그래서 사용자가 WebUI 를 통해 task 진행 과정을 볼 수 있음)

- executor 는 common API 를 갖추고 있음
  개발자가 설정한대로 executor 가 동작함
- airflow.cfg 에 원하는 executor 를 아래와 같이 설정 가능
  [core]
  executor = KubernetesExecutor
- 현재 airflow 가 어떤 executor 를 사용중인지 아래 명령어로 확인 가능
  $ airflow config get-value core executor
  LocalExecutor



< A worker >

- Executor가 지시한 대로 실제로 태스크를 실행하는 '물리적인 프로세스' 또는 '컴퓨팅' 단위. worker 라는 컴포넌트가 따로 있는게 아니라 그냥 task 를 실행하는 프로세스를 싸잡아서 worker 라고 부름
- Executor 가 어떤 방식으로 task 를 넘겨주었는가에 따라 'worker' 가 달라짐
- Sequential Executor 가 사용된 경우, worker 프로세스가 따로 존재하지 않음. 스케줄러 프로세스 자체에서 task 를 수행함 (스케줄러 프로세스 내의 단일 스레드에서 태스크를 순차적으로 실행. 즉 한 번에 하나의 task 만 실행 가능)
- Local Executor 가 사용된 경우, 스케줄러가 관리하는 별도의 프로세스들이 worker 가 되어 tasks 를 수행함 (스케줄러 프로세스는 태스크를 실행하기 위해 별도의 자식 프로세스를 생성하고 관리. 이 자식 프로세스들이 워커의 역할을 하지만, 스케줄러 프로세스에 의해 제어됨. 즉, 별도의 여러 python 프로세스를 생성 후 병렬로 task 실행 가능. 스레드를 사용하지 않는 이유는 python 특유의 고질병인 GIL 때문) 스케줄러와 리소스를 공유하기 때문에 스케줄러 성능에 영향을 미칠 수 있음
- Celery Executor 가 사용된 경우, 독립적인 서버 (혹은 가상 머신) 에서 실행되는 Celery worker 프로세스들이 task 를 수행함. 스케줄러와 별도임
- Kube Executor 가 사용된 경우, kube 클러스터 내에 동적으로 생성된 pods 가 worker 가 되어 task 를 수행함

- worker로 운영될 프로세스 (또는 Pod)의 수, 컴퓨팅 리소스는 Airflow 및 관련 외부 시스템의 설정 파일(airflow.cfg 등)에 정의됨
  따라서 개발자가 직접 worker 의 수를 조절하거나, 얼마만큼의 cpu를 사용할지 등을 구체적으로 정할 수 있음



< A dag processor >

- (개발자가 작성하여 넣은) dag 파일들을 분석하고 직렬화하여 메타데이터 DB에 넣는 역할
- 해당 기능은 scheduler 에 통합됨



< A webserver >

- 사용자가 볼 수 있고, 접근 및 dag 실행 등을 할 수 있는 WebUI 를 제공



< A folder of DAG files >

- 어떤 tasks 가 실행되고 언제 실행될지 등을 파악하기 위해 scheduler 에 의해 읽혀지는 것.



< A metadata database >

- airflow 구성 요소들이 만든 workflows 나 tasks의 상태를 저장하기 위한 저장소.




< DagRun >

- dag 가 실행된 후 생성되는 dag 의 인스턴스. Java 에서 new 로 생성하는 그런 종류의 인스턴스는 아님. 그냥 metastore 의 dag_run 테이블에 여러 정보가 row 로 저장된 것 이라고 이해하면 됨
- dag 의 실행 이력 이라고 보면 됨
- dag 를 실행했을 때의 정보(실행 id, 실행 날짜, 시작 시간, 종료 시간, dag 상태, 트리거 정보 등)를 갖고 있어서
  dagrun 을 보면 "이 dag 가 이 때 이런식으로 동작했구나"를 알 수 있음


 

 


airflow 는 외부 프로그램을 실행하기 위해,
해당 프로그램이 제공하는 다양한 인터페이스(API, 드라이버, 클라이언트 라이브러리, RPC 등)를 활용함
외부 인터페이스에 접근하기 위한 Connection 정보는 metastore 에 미리 저장하고 있음

예를 들어, airflow 가 외부의 mysql 에서 실행할 작업을 트리거 싶을 때
airflow 는 (mysql 연결을 위한 driver 가 구현된) MySQLOperator를 사용함
혹은 직접 mysqlclient 을 구현해서 연결할 수 있음

다른 예로, airflow 가 외부의 spark 에서 실행할 작업을 트리거 하고 싶을 때
airflow 는 (spark master 노드에 연결 가능한) SparkSubmitOperator 를 사용함

만약 외부 프로그램에서 인터페이스(API)를 제공하지 않는 경우,
RPC와 같은 기술을 사용하여(grpc 등) Airflow에서 원격으로 함수나 프로시저를 호출할 수 있음


외부 프로그램과 통신하기 위한 airflow operator 가 필요로하는 정보(connection 정보. 이를테면 host ip, port, id & pw, 인증 정보 등)은 개발자가 알맞게 넣어주어야 함


 

 

 

 

 

 

 

+ Recent posts