"Data Pipellines with Apache Airflow

Apache Airflow 기반의 데이터 파이프라인

에어플로 중심의 워크플로 구축에서 커스텀 컴포넌트 개발 및 배포, 관리까지"

책 내용 공부한 후, 나중에 다시 참고할 내용 필기해 둠

책은 Airflow 2.0.0 버전을 기준으로 쓰여졌지만,

나는 Airflow 2.2.2 버전에서 dag 구현 및 테스트 진행함

 

 

 


 

기본 DummyOperator, BashOperator, PythonOperator 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

def _func():
    print('hello python!')

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=['eyeballs'],
    params={"example_key": "example_value"},
) as dag:

    start = DummyOperator(
        task_id='dummy_operator',
    )

    bash_operator = BashOperator(
        task_id='bash_operator',
        bash_command='echo hello bash!',
    )

    python_operator = PythonOperator(
        task_id='python_operator',
        python_callable=_func,
    )

    start >> bash_operator >> python_operator

 

 


 

dag 에 schedule_interval 은 cron 으로 넣을 수 있지만,

@daily 등으로 넣을 수 있음 [참고]

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='@daily',
    start_date=datetime(2023, 6, 25),
) as dag:
preset meaning cron
None Don’t schedule, use for exclusively “externally triggered” DAGs  
@once Schedule once and only once  
@hourly Run once an hour at the beginning of the hour 매시간 0 * * * *
@daily Run once a day at midnight 매일 자정 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 매주 일요일 자정 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 매월 1일 자정 0 0 1 * *
@yearly Run once a year at midnight of January 1 매년 1월 1일 자정 0 0 1 1 *

 

 


 

 

종료 날짜를 지정하면, 종료 날짜까지만 dag 스케줄링을 진행

예를 들어 아래와 같이 1월 5일까지만 지정하면

execution_date 가 1월 5일 값을 갖는 날까지(즉 현실 시간으로 1월 6일) 동작

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    end_date=datetime(2023, 1, 5)
) as dag:

 


 

며칠 몇 시에 동작하도록 스케줄링하는 게 아니라

3시간마다 혹은 10분마다 스케줄링하도록 만들려면

아래 코드와 같이 dt.timedelta 를 사용하면 됨

3일마다 스케줄링 : dt.timedelta(days=3)

2시간마다 스케줄링 : dt.timedelta(hours=2)

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='dt.timedelta(days=3)',
    start_date=datetime(2023, 6, 25),
) as dag:

 


 

jinja template 진자 템플릿을 이용하여

airflow 가 기본적으로 갖고 있는 변수 중 하나인

execution_date 을 bash operator 에서 사용하는 샘플 코드

 

아래 dag 를 실행하는 실제 날짜는 10월 1일인 상황

 

코드 :

    b1 = BashOperator(
        task_id='b1',
        bash_command='''
            echo execution_date : {{execution_date}} ;
            echo ds : {{ds}} ;
            echo ds_nodash : {{ds_nodash}} ;
            echo execution_date with strftime : {{execution_date.strftime("%Y_%m_%d")}}
        '''
    )

결과 :

[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - execution_date : 2023-09-30T00:00:00+00:00
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - ds : 2023-09-30
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - ds_nodash : 20230930
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - execution_date with strftime : 2023_09_30

 

코드 :

    b2 = BashOperator(
        task_id='b2',
        bash_command='''
            echo next_execution_date : {{next_execution_date}} ;
            echo next_ds : {{next_ds}} ;
            echo next_ds_nodash : {{next_ds_nodash}} ;
        '''
    )

결과 :

[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_execution_date : 2023-10-01T00:00:00+00:00
[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_ds : 2023-10-01
[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_ds_nodash : 20231001

 

코드 :

    b3 = BashOperator(
        task_id='b3',
        bash_command='''
            echo prev_execution_date : {{prev_execution_date}} ;
            echo prev_ds : {{prev_ds}} ;
            echo prev_ds_nodash : {{prev_ds_nodash}}
        '''
    )

결과 :

[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_execution_date : 2023-09-29 00:00:00+00:00
[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_ds : 2023-09-29
[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_ds_nodash : 20230929

 

airflow 가 갖는 기본 변수 : https://airflow.apache.org/docs/apache-airflow/2.2.2/templates-ref.html

 

airflow 가 갖는 기본 변수는 다음과 같이 코드에서 직접 확인 가능

코드 :

def _func(**kwargs):
    for arg in kwargs:
        print(arg, ':', kwargs[arg])

...


p1 = PythonOperator(
    task_id='p1',
    python_callable=_func,
    dag=dag
)

결과 :

더보기
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conf : <airflow.configuration.AirflowConfigParser object at 0x7f9bc30a6eb8>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag : <DAG: execution_date>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag_run : <DagRun execution_date @ 2023-10-01 13:23:29.703212+00:00: manual__2023-10-01T13:23:29.703212+00:00, externally triggered: True>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_end : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_start : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - inlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - logical_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - macros : <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds_nodash' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - outlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - params : {'example_key': 'example_value'}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_start_success : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_end_success : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date_success : None
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_start_date_success : 2023-10-01T13:43:50.677389+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - run_id : manual__2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task : <Task(PythonOperator): p1>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance_key_str : execution_date__p1__20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - test_mode : False
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ti : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds : 2023-10-02
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds_nodash : 20231002
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash : 20231001T132329
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash_with_tz : 20231001T132329.703212+0000
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - var : {'json': None, 'value': None}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conn : <airflow.models.taskinstance.TaskInstance.get_template_context.<locals>.ConnectionAccessor object at 0x7f9bc06c3748>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds : 2023-09-30
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds_nodash : 20230930
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - templates_dict : None

 


 

airflow 2.2 버전부터 헷갈리는 execution_date 대신 data_interval_start 를 사용하도록 업데이트 됨

 

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/

기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end, next_logical_date
prev_execution_date prev_logical_date

 


 

Airflow dag 를 작성할 때 가장 중요하게 고려해야 하는 두 가지는 원자성멱등성

 

원자성 : Airflow 태스크 수행이 성공하려면 완벽하게 성공해야 하고, 실패하려면 완벽하게 실패해야 함

멱등성 : 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과가 동일해야 함

 

 


 

 

zero-padding : 빈 앞자리를 0으로 채움

예를 들어 1월이면 01, 8일이면 08, 4시면 04시

execution_date 에서 추출한 월, 일, 시간에 zero-padding 을 넣는 방법을 다음과 같음

 

코드 :

    b1 = BashOperator(
        task_id='b1',
        bash_command='''
            echo execution_date : {{execution_date}} ;
            echo execution_date.year: {{execution_date.year}} ;
            echo execution_date.month: {{execution_date.month}} ;
            echo month with zero-padding : {{ '{:02}'.format(execution_date.month) }} ;
            echo execution_date.day: {{execution_date.day}} ;
            echo day with zero-padding : {{ '{:02}'.format(execution_date.day) }} ;
            echo execution_date.hour: {{execution_date.hour}} ;
            echo hour with zero-padding : {{ '{:02}'.format(execution_date.hour) }} ;
        '''
    )

결과 :

execution_date : 2023-10-01T07:23:29.703212+00:00
execution_date.year: 2023
execution_date.month: 10
month with zero-padding : 10
execution_date.day: 1
day with zero-padding : 01
execution_date.hour: 7
hour with zero-padding : 07

 


 

 

PythonOperator 가 실행하는 함수에서 execution_date 를 사용하는 여러가지 방법

def _func1(execution_date):
    print('execution_date : ', execution_date)
    year, month, day, hour, *_ = execution_date.timetuple()
    print('year : ', year)
    print('month : ', month)
    print('day : ', day)
    print('hour : ', hour)

def _func2(**context):
    print('execution_date : ', context['execution_date'])

def _func3(execution_date, **context):
    print('execution_date : ', execution_date)
    print('execution_date' in context) #False

...

    p1 = PythonOperator(
        task_id='p1',
        python_callable=_func1,
    )

    p2 = PythonOperator(
        task_id='p2',
        python_callable=_func2,
    )

    p3 = PythonOperator(
        task_id='p3',
        python_callable=_func3,
    )

 

p3 처럼 execution_date 를 파라미터에 직접 넣었다면

**context 에 execution_date 값은 불포함

 


 

 

 

PythonOperator 가 실행하는 함수에 변수 넣는 방법

def _func1(greeting, name, date):
    print(f"{greeting}! I'm {name}. Today is {date}")

def _func2(param1, param2, param3, **context):
    print(f"{param1}! I'm {param2}. Today is {param3}")

...

    p1 = PythonOperator(
        task_id='p1',
        python_callable=_func1,
        op_kwargs={
            "greeting":"hello",
            "name":"eyeballs",
            "date":"{{execution_date}}"
        }
    )

    p2 = PythonOperator(
        task_id='p2',
        python_callable=_func2,
        op_args=["hello", "eyeballs", "{{execution_date}}"]
    )

func1 결과 :

[2023-10-02, 00:31:57 UTC] {logging_mixin.py:109} INFO - hello! I'm eyeballs. Today is 2023-10-01T00:00:00+00:00

func2 결과 :

[2023-10-02, 00:32:00 UTC] {logging_mixin.py:109} INFO - hello! I'm eyeballs. Today is 2023-10-01T00:00:00+00:00

 

아래와 같이 Rendered 를 누르면 Python Operator 에서 함수로 넘겨준 파라미터들을 볼 수 있음

 

 

 


Airflow Task 간 데이터를 교환/전송할 때 xcom 을 사용할 수 있음

하나의 Task 가 xcom(피클링)을 통해 데이터를 메타스토어에 저장하고,

다른 Task 에서 xcom 이 저장한 데이터를 메타스토어로부터 읽는 것으로 데이터를 교환함

 

피클 Picke 이란, Python 객체를 디스크에 저장할 수 있게 하는 직렬화 프로토콜

피클링된 객체는 메타스토어의 블록 blob 에 저장되기 때문에

일반적으로 문자열 몇 개와 같은 작은 데이터 전송에만 적용 가능

 

Task 간 좀 더 큰 데이터를 교환 할 때는 xcom 이 아닌 외부 데이터베이스를 사용해야 함

 

xcom 샘플 코드

def _push_xcom(**context):
    context["task_instance"].xcom_push(key="name", value="eyeballs")

def _pull_xcom(**context):
    name=context["task_instance"].xcom_pull(task_ids="push_op", key="name")
    print(f"Hello, {name}!")

...

    push_op = PythonOperator(
        task_id='push_op',
        python_callable=_push_xcom,
    )

    pull_op = PythonOperator(
        task_id='pull_op',
        python_callable=_pull_xcom,
    )

    push_op >> pull_op

위 코드에서 사용된 xcom_pull 은 현재 DAG 실행을 통해 게시된 값만 가져옴

다른 날짜에서 실행된 task 가 xcom_push 로 넣은 값을 (키값이 동일해도) 가져오지 않는다는 말임

 

 


 

의존성 fan-out, fan-in

a >> [b,c] >> d

 


트리거 규칙 Trigger Rule 이란, Task 가 트리거되는 기준을 의미함

즉, Task 가 언제 어떤 조건이 되었을 때 실행 될 것인지를 말 함

 

airflow Trigger Rule 문서 :

https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html?highlight=all_failed#trigger-rules 

  • all_success (default): All upstream tasks have succeeded
    모든 상위 태스크가 성공하면 트리거됨
  • all_failed: All upstream tasks are in a failed or upstream_failed state
    모든 상위 태스크가 실패하면 트리거됨
    태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드를 트리거하는 용도로 사용
  • all_done: All upstream tasks are done with their execution
    성공/실패 상관 없이 모든 상위 태스크가 실행을 마치면 트리거됨
    모든 상위 태스크 실행 완료 후 청소 코드 등을 실행하는 용도로 사용
  • one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
    하나 이상의 상위 태스크가 실패하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음
    알림 또는 롤백 같은 일부 오류 처리 코드를 빠르게 실행하기 위한 용도로 사용
  • one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
    하나 이상의 상위 태스크가 성공하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음
    하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알람을 빠르게 실행하기 위한 용도로 사용
  • none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
    실패한 상위 태스크는 없지만, 태스크가 성공 또는 건너 뛴 경우 트리거됨
    DAG 에 조건부 브랜치 구조가 있는 경우 사용
  • none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
  • none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
    건너뛴 상위 태스크는 없지만, 태스크가 성공 또는 실패한 경우 트리거됨
    모든 업스트림 태스크가 실행된 경우, 해당 결과를 무시하고 실행하기 위한 용도로 사용
  • always: No dependencies at all, run this task at any time
    상위 태스크 상태와 상관없이 트리거됨
    테스트 용도로 사용

 


 

센서는 특정 조건이 true 인지 지속적으로 확인하고 true 라면 성공

만약 특정 조건에 false 가 뜨면, 상태가 true 가 될 때 까지 혹은 지정된 timeout 이 날 때까지 계속 확인함

 

PythonSensor 의 경우, PythonSensor가 사용하는 함수의 return 값이 true 인지 false 인지를 계속 확인함

PythonSensor 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.dummy import DummyOperator

def _sensor():
    if datetime.now().second == 30 : return True
    else : return False

with DAG(
    dag_id='sensor',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
) as dag:

    start_op = DummyOperator(
        task_id='start_op',
    )

    sensor_op = PythonSensor(
        task_id='sensor_op',
        python_callable=_sensor,
    )

    stop_op = DummyOperator(
        task_id='stop_op',
    )

    start_op >> sensor_op >> stop_op

 

 


 

 

sensor 가 무한정 기다리는 상황이 닥치게 되면

그 다음 스케줄링 때 다시 sensor 가 실행되고

그 다음 스케줄링 떄 다시 sensor 가 실행되고...

이렇게 sensor 가 끝나지 않고 계속 누적되어 동작하여

DAG 내의 최대 task 수 (default 16) 개에 도달하게 되면

새로운 task 가 실행 자체를 못 하는 경우가 발생함

 

따라서 DAG 내 최대 task 수를 조절하거나

sensor 의 timeout 을 하루 등으로 줄이거나

poke 모드 대신 reschedule 모드를 사용하여 포크 동작을 실행할 때만 task 슬롯을 차지하고

대기 시간 동안은 task 슬롯을 차지하지 않도록 만들어야 함

 


 

Dag 에서 다른 Dag 를 Trigger 할 수 있음

TriggerDagRunOperator 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id='dag1',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
) as dag1:
    start_op = DummyOperator(
        task_id='start_op',
    )
    trigger_dag = TriggerDagRunOperator(
        task_id='trigger_dag',
        trigger_dag_id='dag2'
    )
    start_op >> trigger_dag 

with DAG(
    dag_id='dag2',
    schedule_interval=None,
    start_date=datetime(2023,6,25),
) as dag2:
    start_op = DummyOperator(
        task_id='start_op',
    )
    start_op

결과 : dag2 의 Schedule 은 None 임에도 불구하고, dag1 에 의해 호출되어 실행됨

 


 

Dag 간 fan-in 의존성 맞추는 방법은 시스템 상에서 제공되지 않음

예를 들어 dag1과 dag2가 모두 성공해야 dag3 이 트리거되는 방법은 없다는 말임

대신 xcom 등으로 dag1, dag2 의 성공 여부를 저장하고 dag3 에서 센서 등으로 결과값을 확인한다거나

ExternalTaskSensor 를 사용하여 dag1, dag2 의 상태를 꾸준히 확인하는 방법을 사용 가능함

 

Airflow ExternalTaskSensor Documentation : https://airflow.apache.org/docs/apache-airflow/2.2.2/howto/operator/external_task_sensor.html

 

ExternalTaskSensor 를 사용하는 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

with DAG(
    dag_id='dag1',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
) as dag1:
    dag1_op = DummyOperator(
        task_id='dag1_op',
    )

with DAG(
    dag_id='dag2',
    schedule_interval="0 0 * * *",
    start_date=datetime(2023,6,25),
    catchup=False,
) as dag2:
    dag2_op = DummyOperator(
        task_id='dag2_op',
    )

with DAG(
    dag_id='dag3',
    schedule_interval=None,
    start_date=datetime(2023,6,25),
    catchup=False,
) as dag3:
    dag1_sensor = ExternalTaskSensor(
        task_id = "dag1_sensor",
        external_dag_id = "dag1",
        external_task_id = "dag1_op"
    )
    dag2_sensor = ExternalTaskSensor(
        task_id = "dag2_sensor",
        external_dag_id = "dag2",
        external_task_id = "dag2_op"
    )
    start_op = DummyOperator(
        task_id='start_op',
    )
    [dag1_sensor, dag2_sensor] >> start_op

 


 

Task, DAG의 실행이 평소보다 오래 걸리는 상황을 파악하기 위해

SLA(Service-Level Agreement) 을 추가 가능

SLA 로 지정해 둔 제한 시간보다 실행 시간이 오래 걸린다면 경고를 주거나 python 함수 등을 실행하게 만듦

 

SLA는 Dag 및 Task 에 각각 설정 가능함

Task 의 시작 또는 종료 시간이 DAG 시작 시간고 비교하여 SLA 에 지정한 제한 시간을 넘겼는지 확인 할 것임

 

SLA 를 2초로 설정한 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

default_args={
    "sla":timedelta(seconds=2),
}

def sla_miss_callback(context):
    print("TOO LONG!")

def _func():
    sum = 0
    for i in range(99999999):
        sum = sum + i
    print(sum)

with DAG(
    dag_id='test',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
    sla_miss_callback=sla_miss_callback,
    default_args=default_args,
) as dag:

    start = DummyOperator(
        task_id='dummy_operator',
    )
    
    python = PythonOperator(
        task_id='python',
        python_callable=_func
    )

    start >> python

 

다음과 같이 Operator 에 적용할 수 있다고 함

    python = PythonOperator(
        task_id='python',
        python_callable=_func,
        sla=timedelta(seconds=2)
    )

 

DAG 의 시작 시간과 Task 의 종료 시간을 계속 비교한다고 함

주의 할 점은 비교하는 시작 시간이 Task 의 시작 시간이 아니라 DAG 의 시작 시간이라는 것

(개별 Task 가 아닌 DAG 시작 시간을 기준으로 SLA 가 정의되었기 때문)

 


sla 뿐만 아니라, Task 가 성공했을 때 혹은 Task 가 실패했을 때 callback 을 호출하도록 만들 수 있음

 

NameDescription
on_success_callback Invoked when the task succeeds
on_failure_callback Invoked when the task fails
sla_miss_callback Invoked when a task misses its defined SLA
on_retry_callback Invoked when the task is up for retry

 

Callback 사용 샘플 코드

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
) as dag:

    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")
    task3 = DummyOperator(task_id="task3", on_success_callback=dag_success_alert)
    task1 >> task2 >> task3

 

 

Task 들을 그룹으로 묶어서

눈으로 보기에 좀 더 정렬되어 보이고 이해하기 쉽도록 만들 수 있음

 

 

Task Group 사용 샘플 코드

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

# [START howto_task_group]
with DAG(
    dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]
) as dag:
    start = DummyOperator(task_id="start")

    # [START howto_task_group_section_1]
    with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
        task_1 = DummyOperator(task_id="task_1")
        task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
        task_3 = DummyOperator(task_id="task_3")

        task_1 >> [task_2, task_3]
    # [END howto_task_group_section_1]

    # [START howto_task_group_section_2]
    with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
        task_1 = DummyOperator(task_id="task_1")

        # [START howto_task_group_inner_section_2]
        with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = DummyOperator(task_id="task_3")
            task_4 = DummyOperator(task_id="task_4")

            [task_2, task_3] >> task_4
        # [END howto_task_group_inner_section_2]

    # [END howto_task_group_section_2]

    end = DummyOperator(task_id='end')

    start >> section_1 >> section_2 >> end

 

Airflow WebUI Graph 에서 보이는 화면

차례대로 Group 을 클릭하지 않았을 때, Group 을 클릭했을 때

 

+ Recent posts