2.


airflow 의 장점
- 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG 로 만듦
- backfill 가능
- 원하는대로 스케줄링 가능

dag_id 는 DAG 를 나타내는 유니크한 값

각 오퍼레이터 (bashoperator, pythonoperator....) 는 하나의 태스크 수행하고
여러 오퍼레이터가 모여 DAG 를 구성

task 와 operator 차이점
operator 는 단일 작업을 수행하는 역할
taskoperator 의 상태를 관리하고 사용자에게 시작/완료 등의 상태 변경을 표시하는 Airflow 의 내장 컴포넌트


Airflow WebUI 화면에서
성공한 태스크는 초록색
실패한 태스크는 빨간색
앞 태스크의 성공을 기다리고 있는 태스크는 주황색으로 표시

실패한 태스크는 Clear 버튼으로 실패한 부분부터 다시 실행 가능


 

 

3.


종료 날짜가 없다면 DAG 의 스케줄링이 계속 진행됨

스케줄링(schedule_interval)은 "@daily" 처럼 넣거나 "0 2 * * *" 처럼 넣을 수 있음

또한 3일마다 (dt.timedelta(days=3)), 2시간마다(dt.timedelta(hours=2)) 처럼
crontab 에서 못 하는 간격 (빈도) 스케줄링을 Airflow 는 할 수 있음



증분 방식이란, 전체 데이터를 가져오기 보다는 내가 원하는 날짜의 데이터만 가져와서 처리하는 것
예를 들어 openAPI 를 그냥 사용하면 30일치 데이터를 들고오지만
정해진 날짜 파라미터에 값을 넣으면 해당 날짜 데이터만 가져옴
이렇게 매일 다른 날짜 처리를 하며 데이터를 쌓아나가는 것이 증분 방식

이를 위해 동적으로 바뀌는 날짜 값을 알아야 함
execution_date 가 바로 그것임

execution_date 는 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프임
헷갈리기 때문에 execution_date 는 deprecated 되고 대신 다른 값이 생김
실제로 실행하는 날짜 'data_interval_end' 가 됨


기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end
tomorrow_ds deprecate
tomorrow_ds_nodash deprecate
yesterday_ds deprecate
yesterday_ds_nodash deprecate
prev_execution_date prev_logical_date
next_execution_date next_logical_date

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/


백필은 catchup 이 true 일 때,
시작 시간부터 현재 시간까지의 모든 과거 스케줄 간격마다 모든 작업을 수행하는 기능

Airflow 태스크는 원자성과 멱등성을 갖어야 함
- 원자성 : task 가 성공적으로 수행하거나, 실패시 시스템에 영향이 없는 상태에서 실패해야 함
  예를 들어 중간 결과물을 만드는 태스크가 쓰는 도중에 실패할 경우, 뒤의 태스크가 결과물을 이용할 수 있는 상황이 발생
- 멱등성 : 여러번 실행해도 같은 결과를 도출해야 함
  예를 들어 결과값에 append 를 하는 태스크는 여러번 실행하면 결과값이 계속 늘어날 것임

 

 

4.


jinja 템플릿은 런타임 시에 사용 가능한 문자열 변수
다음과 같이 중괄호 두 개로 사용 가능

BashOperator(
  task_id="test",
  bash_command=(
    "echo {{ execution_date.year }}"
    "echo {{ '{:02}'.format(execution_date.month) }}"
  )
)

jinja 템플릿에 사용 가능한 변수들은 모두 태스크 콘텍스트에서 살펴볼 수 있음
operator 실행시 kwargs 를 통해 태스크 컨텍스트 변수들이 전달되어 사용 가능

op_kwargs는 dict
op_args는 list


DummyOperator 는 EmptyOperator 로 바뀌었음
아무 작업도 하지 않으며, 여러 다른 작업들을 그룹화(fan-out) 하는데 사용

mock 테스트 용도로 사용됨
테스트시 EmptyOperator 를 사용하고, 나중에 다른 Operator 로 변경하는 식

BranchPythonOperator 에서 일부 작업을 건너 뛰고 싶을 때 EmptyOperator 를 대신 사용
(빈 경로를 가질 수 없기 때문)

참고 https://stackoverflow.com/a/57486645



태스크간 작은 데이터를 전달할 때 xcom 을 사용 가능(메타스토어에 저장되고, 메타스토어마다 저장 가능 크기가 다름)
태스크간 큰 데이터를 전달할 때, 모든 태스크가 접근 가능한 스토리지 사용하여 전달

xcom 혹은 스토리지로 데이터 전달시,

dag 에 나타나지 않는 의존관계
를 만듦
예를 들어 A태스크에서 토큰을 받아 xcom 에 넣었고 B태스크에서 꺼내 쓰는 경우
A와 B 사이에 의존 관계가 형성됨

원자성을 무너뜨릴 수 있음
예를 들어 B태스크에서 사용하려고 토큰을 꺼내보니 기간만료라서 사용 불가한 경우

xcom 에 저장되는 데이터는 반드시 직렬화가 가능해야 함
예를 들어 파이썬 람다는 xcom 에 저장 불가

xcom 을 직접 사용하는 대신
taskflow API 를 사용하여 데이터를 넘길 수 있음
마치 python 함수 파라미터로 데이터를 받는 느낌


 

5.


fan-out 구조 의존성

task_start = EmptyOperator(task_id="start")
task_start >> [task_A, task_B]

fan-in 구조 의존성

[task_A, task_B] >> task_finish


코드 내용에 따라 의존성 결정하기 위해 BranchPythonOperator 사용

    def choose_branch(**context):
        if context["execution_date"] == THE_DATE:
            return "b1"
        if context["execution_date"] > THE_DATE:
            return "b2"
        if context["execution_date"] < THE_DATE:
            return "b3"

    branching = BranchPythonOperator(task_id='choose_branch', python_callable=choose_branch)
    b1 = BashOperator(task_id='b1', bash_command='echo b1')
    b2 = BashOperator(task_id='b2', bash_command='echo b2')
    b3 = BashOperator(task_id='b3', bash_command='echo b3')
    c1 = BashOperator(task_id='c1', bash_command='echo c1')

    start_dag >> branching >> [b1, b2, b3]
    b1 >> c1


참고 https://wefree.tistory.com/38



트리거 규칙은 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수 조건

all_success (default): All upstream tasks have succeeded
all_failed: All upstream tasks are in a failed or upstream_failed state
all_done: All upstream tasks are done with their execution
one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
always: No dependencies at all, run this task at any time

 

6.


센서는 특정 조건이 true 인지 지속적으로 확인하고
조건이 true 라면 지정된 작업을 수행

센서는 상태가 true 가 될 때까지 혹은 타임아웃이 될 때까지 계속 확인 진행
true가 되거나 타임아웃이 지나면 확인을 멈춤


filesensor 의 경우 wildcard 를 사용해 파일을 감시하는 것이 가능함

파일이 writing 되는 도중에 filesensor 가 감지하고 그 불완전한 파일을 사용할 수 있기 때문에
파일이 성공적으로 쓰여지면 파일 이름에 success 등의 태그를 붙임
filesensor 가 success 태그가 붙는 파일만 감지하도록 만듦
이 방법은 filesensor 의 오용을 피하는 방법도 되지만,
Operator 의 원자성을 지키기 위한 방법도 됨(불완전한 파일이 시스템에 영향을 주지 않도록)

filesensor 대신 pythonsensor 사용도 가능
python 코드를 이용해서 특정 path 에 특정 파일이 저장되었는지 확인하는 센서이며
조건이 true 라면 true 를 return 함



센서는 (기본) 7일을 넘기면 timeout 이 나면서 실패하게 됨

매일 센서가 스케줄링되면, 7일동안 7개의 센서가 실행되게 되므로
리소스를 잡아먹으며, 새롭게 실행되어야 할 태스크가 실행되지 않는(센서 데드락) 원인이 됨
따라서 센서를 실행할 때는 reschedule 모드를 설정
센서 reschedule 모드 poke 동작을 실행할 때만 태스크 슬롯을 차지하며,
대기 시간 동안은 슬롯을 차지하지 않음

태스크 실행 수 제한 참고 : https://eyeballs.tistory.com/534



TriggerDagRunOperator 를 사용하여 Dag1 의 태스크에서 Dag2 를 실행할 수 있음
트리거되는 Dag2 는 schedule_interval 이 필요하지 않아서 None 값을 갖음

백필에 의해 Dag1 이 동작하면 이전에 실행된 task 상태는 삭제되고 새로운 태스크가 실행됨
이에 따라 Dag2 가 트리거되어 실행되는데,
트리거되는 Dag2 에서는 이전에 실행된 태스크가 삭제되지 않고 새로운 태스크가 생성



dag 실행시 dag_run 에 run_id 값이 뜸
run_id 값을 보고 해당 dag 가 어떻게 실행되었는지 알 수 있음

schedule__ : dag가 스케줄되어 실행됨
backfill__ : dag 가 백필에 의해 실행됨
manual__ : dag 가 수동으로 실행됨



Airflow 는 DAG 간 의존성을 관리하는 방법을 제공하지 않음
예를 들어 dag1, dag2 가 모두 성공하면 dag3 가 트리거되어 실행되는 경우
[dag1,dag2] >> [dag3] 처럼 의존성을 관리하는 방법은 없음

다른 방법으로 의존성을 만들 수 있음
dag3 에서 ExternalTaskSensor 를 사용하면 됨
ExternalTaskSensor 는 dag1 과 dag2 의 태스크를 살펴보다가, 둘 다 성공하게 되면 dag3 의 태스크를 실행함

이 방법을 사용하기 위해서는
dag3 의 ExternalTaskSensor 실행 날짜와
dag1, dag2 의 태스크 실행 날짜가 완벽하게 동일해야 함
그렇지 않으면 ExternalTaskSensor 가 dag1,2 의 태스크를 찾지 못 함

실행 날짜가 다르다면, 다른 만큼 차이값을 ExternalTaskSensor 에 미리 넣어주어야 함
만약 dag1,2 태스크 실행 시간이 매일 10시이고
dag3 ExternalTaskSensor 실행 시간이 매일 12시 라면,
2시간 차이가 난다고 ExternalTaskSensor 에 미리 알려주어 실행 날짜를 맞춤




TriggerDagRunOperator 을 이용하지 않고,
다른 방법으로 dag 를 트리거 할 수 있음
바로 CLI 와 RestAPI
이 두 가지 방법으로 dag 를 트리거하면, 트리거 하는 순간에만 dag 가 실행되고
dag가 꾸준히 스케줄링되지는 않음

다음과 같은 CLI 명령으로 dag1 대그 실행 가능

airflow dags trigger dag1
airflow dags trigger --conf '{"age": 12, "sex": "M"}' dag1

RestAPI 도 사용 가능


 

 

 

9.


테스트에 대한 내용
다시 보자


 

 

11.

모범 사례
마지막에 볼 것

 

12.


Airflow 아키텍처는 최소 세 가지 요소를 갖추고 있음

- 웹서버 : Airflow 의 프로세스
  파이프라인 상태정보 시각화
  사용자가 dag 수행 및 관리
  메타스토어의 직렬화된 dags 를 읽고 사용

- 스케줄러 : Airflow 의 프로세스
  dags 가 있는 dir 에 접근 가능해야 함
  dags 를 읽고 구문 분석 후 메타데이터에 저장
  실행할 태스크 결정하고 대기열에 배치

- 메타스토어(데이터베이스) : 메타데이터 저장
  직렬화된 dags 를 저장하고 있음



4 가지 익스큐터 유형

- SequenctialExecutor
  분산 환경에서 사용 불가능
  테스트용

- LocalExecutor
  분산 환경에서 사용 불가능
  단일 호스트용

- CeleryExecutor
  분산 환경에서 사용 가능
  멀티 호스트용

-KubernetesExecutor
  분산 환경에서 사용 불가능
  쿠버네티스 기반 컨테이너용



LocalExecutor 는 단일 머신에서 태스크 병렬 실행이 가능
기본적으로 워커 프로세스는 최대 32개 병렬 실행 가능
즉, 단일 머신에서 여러 워커가 동작하며 병렬 실행

CeleryExecutor 는 내부적으로 Celery 를 이용하여 실행할 태스크들에 대해 대기열을 등록
이후, 워커가 대기열에 등록된 태스크를 읽고 개별적으로 처리
즉, 다중 머신에서 여러 워커가 동작하며 분산 병렬 실행
대기열 메커니즘을 위해 redis, rabbitmq 등의 인메모리 데이터베이스 사용

KubernetesExecutor 는 쿠버네티스에서 워크로드 실행
airflow dag 의 각 태스크는 독립된 쿠버네티스 파드에서 실행

[그림 설명 링크]



airflow 에서 일어나는 모든 일은 메타스토어에 저장됨
airflow 는 SQLAlchemy 를 이용하여 외부 Database 에 접근 및 태스크를 수행하여 메타스토어로 사용
메타스토어는 PostgreSQL 이나 MySQL 을 권장. SQLite 는 동시 쓰기가 지원되지 않음



스케줄러를 자세히 알아봄

스케줄러 역할은 다음과 같음
- dag 파일 구문 분석 후, 추출된 정보는 메타스토어에 저장
- 실행할 준비가 된 태스크를 결정하고 이를 대기 상태로 전환
- 대기 상태의 태스크 가져와서 실행(익스큐터에게 넘기나 봄)

스케줄러는 dag 디렉터리를 주기적(while loop)으로 확인하고 dags 파일 처리
dags 파일이 변경되지 않아도 주기적으로 확인
변경된 dags 이 있으면 구문분석 후 메타스토어에 저장

스케줄러가 dags 읽고 처리하는 데 CPU 가 소모됨
처리 주기를 줄이면 CPU 가 더 소모됨
처리 주기, 구문 분석에 사용될 프로세스 수 변경 등 옵션이 존재함

스케줄러는 실행할 태스크 인스턴스를 결정하는 역할도 함
각 태스크 인스턴스에 대해 모든 태스크가 종속성이 충족되는지,
모든 태스크가 정상적으로 마지막 단계까지 진행되었는지 확인 후
다음 순서로 실행되어야 하는 태스크들을 예약

실행이 예약된 태스크들은
해당 태스크에 쓸 수 있는 슬롯에 여유가 있을 때 (실행을 위해) 대기열에 넣어짐
태스크 인스턴스가 대기열에 배치되면 더 이상 스케줄러의 통제를 받지 않음
스케줄러의 역할은 실행 가능한 태스크 확인 후 예약, 그리고 대기열에 넣어주는 것 까지

태스크 익스큐터 프로세스가 그 다음 작업을 진행함
익스큐터(Local, Celery 등) 는 대기열에 태스크가 올 때 까지 기다림
익스큐터가
대기열에서 태스크 인스턴스 읽고, 워커에서 태스크 실행
익스큐터에서 태스크를 실행한다는 것은,
익스큐터에서 실행중인 태스크가 실패해도 airflow 자체에 영향이 가지 않도록 태스크를 실행할 새 프로세스를 만드는 것을 의미함

태스크의 상태는 (메타스토어에 저장되는) 하트비트로 알 수 있음



익스큐터는 스케줄러의 일부분
하나의 airflow 환경에서 사용 가능한 익스큐터 종류는 단 하나
태스크가 '실행 중' 상태로 바뀌었다는 말은, 익스큐터가 해당 태스크를 담당하여 실행하고 있다는 말




dag 의 task 실행이 실패하면
on_failure_callback 에 등록한 함수를 실행하도록 할 수 있음
여기 등록된 함수 내에서 slack 이나 email 로 메세지를 보내도록 하면 됨

성공시 콜백되는 함수를 등록하는 on_success_callback
태스크 재시도시 콜백되는 함수를 등록하는 on_retry_callback 도 존재함



 

 

 

 

 


DagRun 
- Task 인스턴스들을 DAG에 정의 된 특정 execution_date 에 실행하는 DAG의 인스턴스
  단순하게 표현하면 DAG의 실행 이력을 의미
  실행 중이거나 종료된 상태를 의미
- 원자성을 갖음 : 다른 리소스가 필요하지 않음
- 멱등성을 갖음 : 반복해도 동일한 결과를 보여줌

 

두번째, 세번째, 네번째 dagrun 이 실행하지 못하다가 5일날 모두 catchup 되었다면,

execution_date 는 원래 실행되었어야 했던 그 날의 execution_date 를 갖고

start_date 는 catchup 한 날짜(5일)이 됨

 

 

 

 

 

 

 

 

+ Recent posts