Airflow 테스트를 위해

Docker 로 Airflow 설치하는 방법을 설명함

 

Centos 7 을 사용하였고, Airflow 는 Sequential Executor 를 사용함.

 

 

 

Centos 7 Docker Image : https://eyeballs.tistory.com/543

 

위 Docker Image 를 만들어준 다음, 아래 명령어로 container 하나 띄움

 

$ docker run --name airflow -p 8080:8080 --privileged -d mycentos:7 init

 

아래 명령어로 docker container 들어가서 필요한 것들 설치 진행

$ docker exec -it airflow bash
$ yum upgrade -y
$ yum install python3 wget vim sudo gcc make -y

 

아래 명령어로 sqlite 최신 버전 설치 [참고]

( sqlite 다운로드 페이지 :https://www.sqlite.org/download.html )

만약 sqlite3 --version 이 3.7.17

$ cd /opt
$ wget https://www.sqlite.org/2023/sqlite-autoconf-3430100.tar.gz
$ tar -xzf sqlite-autoconf-3430100.tar.gz 
$ cd sqlite-autoconf-3430100
$ make
$ make install

위처럼 sqlite3 를 최신 버전으로 업그레이드 하는 경우는

pip 로 airflow 설치시 3.8.3 이상의 sqlite 버전을 필요로 하기 때문.

can't find new sqlite version? (SQLite 3.8.3 or later is required (found 3.7.17))

만약 sqlite3 버전이 3.8.3 이상이라면 위와 같이 최신 버전으로 업그레이드 하지 않아도 됨

 

 

airflow 라는 사용자를 만들어서 sudo 권한 부여

$ useradd airflow
$ usermod -aG wheel airflow

 

아래 명령어로 /etc/sudoers 편집 진행

아래 주석 처리되어있는 부분의 주석을 해제

$ visudo
전) #%wheel  ALL=(ALL)       NOPASSWD: ALL
후) %wheel  ALL=(ALL)       NOPASSWD: ALL

 

아래 명령어로 사용자 airflow 로 접근

지금부터 나오는 명령어들은 모두 airflow 계정으로 진행

$ su - airflow

 

아래 명령어로 pip3 업그레이드 진행 및 setuptool 설치

$ sudo -H pip3 install --upgrade --ignore-installed pip setuptools

 

 

아래 명령어로 airflow home 을 만듦

$ mkdir ~/airflow
$ export AIRFLOW_HOME=~/airflow

 

아래 명령어로 sqlite3 가 최신 버전(>3.7.17)으로 업그레이드 되었는지 확인

$ pyhon3
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.7.17'
$ export LD_LIBRARY_PATH="/usr/local/lib"
$ python3
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.43.1'

 

아래 명령어로 airflow 설치

나는 2.2.2를 설치하고 싶어서 직접 2.2.2를 넣었으니, 원하는 버전을 넣으면 됨.

$ AIRFLOW_VERSION=2.2.2
$ PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ CONSTRAINT_URL=" https://raw.githubusercontent.com/apache/airflow/constraints-$ {AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 

아래 명령어로 apache airflow 의 admin 유저 생성

$ airflow users  create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow

 

아래 명령어로 apache airflow 를 Sequential Executor 모드로 실행

$ airflow standalone

 

잠시 후 웹브라우저에서 localhost:8080 에 접근하여 WebUI 가 뜨는지 확인

id, pw 는 각각 airflow, airflow 로 접근 가능

 

dags 위치는 airflow.cfg 에서 찾아볼 수 있음 ($AIRFLOW_HOME/dags)

$ cat $AIRFLOW_HOME/airflow.cfg | grep dags_folder

 

 

 

 

 

 

 

참고

https://airflow.apache.org/docs/apache-airflow/2.2.2/start/local.html

https://www.webdesignsun.com/insights/upgrading-sqlite-on-centos/

https://musclebear.tistory.com/131

https://sun2day.tistory.com/216

 

"Data Pipellines with Apache Airflow

Apache Airflow 기반의 데이터 파이프라인

에어플로 중심의 워크플로 구축에서 커스텀 컴포넌트 개발 및 배포, 관리까지"

책 내용 공부한 후, 나중에 다시 참고할 내용 필기해 둠

책은 Airflow 2.0.0 버전을 기준으로 쓰여졌지만,

나는 Airflow 2.2.2 버전에서 dag 구현 및 테스트 진행함

 

 

 


 

기본 DummyOperator, BashOperator, PythonOperator 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

def _func():
    print('hello python!')

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=['eyeballs'],
    params={"example_key": "example_value"},
) as dag:

    start = DummyOperator(
        task_id='dummy_operator',
    )

    bash_operator = BashOperator(
        task_id='bash_operator',
        bash_command='echo hello bash!',
    )

    python_operator = PythonOperator(
        task_id='python_operator',
        python_callable=_func,
    )

    start >> bash_operator >> python_operator

 

 


 

dag 에 schedule_interval 은 cron 으로 넣을 수 있지만,

@daily 등으로 넣을 수 있음 [참고]

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='@daily',
    start_date=datetime(2023, 6, 25),
) as dag:
preset meaning cron
None Don’t schedule, use for exclusively “externally triggered” DAGs  
@once Schedule once and only once  
@hourly Run once an hour at the beginning of the hour 매시간 0 * * * *
@daily Run once a day at midnight 매일 자정 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 매주 일요일 자정 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 매월 1일 자정 0 0 1 * *
@yearly Run once a year at midnight of January 1 매년 1월 1일 자정 0 0 1 1 *

 

 


 

 

종료 날짜를 지정하면, 종료 날짜까지만 dag 스케줄링을 진행

예를 들어 아래와 같이 1월 5일까지만 지정하면

execution_date 가 1월 5일 값을 갖는 날까지(즉 현실 시간으로 1월 6일) 동작

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    end_date=datetime(2023, 1, 5)
) as dag:

 


 

며칠 몇 시에 동작하도록 스케줄링하는 게 아니라

3시간마다 혹은 10분마다 스케줄링하도록 만들려면

아래 코드와 같이 dt.timedelta 를 사용하면 됨

3일마다 스케줄링 : dt.timedelta(days=3)

2시간마다 스케줄링 : dt.timedelta(hours=2)

with DAG(
    dag_id='eyeballs_dag',
    schedule_interval='dt.timedelta(days=3)',
    start_date=datetime(2023, 6, 25),
) as dag:

 


 

jinja template 진자 템플릿을 이용하여

airflow 가 기본적으로 갖고 있는 변수 중 하나인

execution_date 을 bash operator 에서 사용하는 샘플 코드

 

아래 dag 를 실행하는 실제 날짜는 10월 1일인 상황

 

코드 :

    b1 = BashOperator(
        task_id='b1',
        bash_command='''
            echo execution_date : {{execution_date}} ;
            echo ds : {{ds}} ;
            echo ds_nodash : {{ds_nodash}} ;
            echo execution_date with strftime : {{execution_date.strftime("%Y_%m_%d")}}
        '''
    )

결과 :

[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - execution_date : 2023-09-30T00:00:00+00:00
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - ds : 2023-09-30
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - ds_nodash : 20230930
[2023-10-01, 13:43:54 UTC] {subprocess.py:89} INFO - execution_date with strftime : 2023_09_30

 

코드 :

    b2 = BashOperator(
        task_id='b2',
        bash_command='''
            echo next_execution_date : {{next_execution_date}} ;
            echo next_ds : {{next_ds}} ;
            echo next_ds_nodash : {{next_ds_nodash}} ;
        '''
    )

결과 :

[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_execution_date : 2023-10-01T00:00:00+00:00
[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_ds : 2023-10-01
[2023-10-01, 13:43:58 UTC] {subprocess.py:89} INFO - next_ds_nodash : 20231001

 

코드 :

    b3 = BashOperator(
        task_id='b3',
        bash_command='''
            echo prev_execution_date : {{prev_execution_date}} ;
            echo prev_ds : {{prev_ds}} ;
            echo prev_ds_nodash : {{prev_ds_nodash}}
        '''
    )

결과 :

[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_execution_date : 2023-09-29 00:00:00+00:00
[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_ds : 2023-09-29
[2023-10-01, 13:44:01 UTC] {subprocess.py:89} INFO - prev_ds_nodash : 20230929

 

airflow 가 갖는 기본 변수 : https://airflow.apache.org/docs/apache-airflow/2.2.2/templates-ref.html

 

airflow 가 갖는 기본 변수는 다음과 같이 코드에서 직접 확인 가능

코드 :

def _func(**kwargs):
    for arg in kwargs:
        print(arg, ':', kwargs[arg])

...


p1 = PythonOperator(
    task_id='p1',
    python_callable=_func,
    dag=dag
)

결과 :

더보기
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conf : <airflow.configuration.AirflowConfigParser object at 0x7f9bc30a6eb8>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag : <DAG: execution_date>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag_run : <DagRun execution_date @ 2023-10-01 13:23:29.703212+00:00: manual__2023-10-01T13:23:29.703212+00:00, externally triggered: True>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_end : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_start : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - inlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - logical_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - macros : <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds_nodash' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - outlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - params : {'example_key': 'example_value'}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_start_success : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_end_success : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date_success : None
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_start_date_success : 2023-10-01T13:43:50.677389+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - run_id : manual__2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task : <Task(PythonOperator): p1>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance_key_str : execution_date__p1__20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - test_mode : False
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ti : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds : 2023-10-02
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds_nodash : 20231002
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash : 20231001T132329
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash_with_tz : 20231001T132329.703212+0000
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - var : {'json': None, 'value': None}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conn : <airflow.models.taskinstance.TaskInstance.get_template_context.<locals>.ConnectionAccessor object at 0x7f9bc06c3748>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds : 2023-09-30
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds_nodash : 20230930
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - templates_dict : None

 


 

airflow 2.2 버전부터 헷갈리는 execution_date 대신 data_interval_start 를 사용하도록 업데이트 됨

 

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/

기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end, next_logical_date
prev_execution_date prev_logical_date

 


 

Airflow dag 를 작성할 때 가장 중요하게 고려해야 하는 두 가지는 원자성멱등성

 

원자성 : Airflow 태스크 수행이 성공하려면 완벽하게 성공해야 하고, 실패하려면 완벽하게 실패해야 함

멱등성 : 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과가 동일해야 함

 

 


 

 

zero-padding : 빈 앞자리를 0으로 채움

예를 들어 1월이면 01, 8일이면 08, 4시면 04시

execution_date 에서 추출한 월, 일, 시간에 zero-padding 을 넣는 방법을 다음과 같음

 

코드 :

    b1 = BashOperator(
        task_id='b1',
        bash_command='''
            echo execution_date : {{execution_date}} ;
            echo execution_date.year: {{execution_date.year}} ;
            echo execution_date.month: {{execution_date.month}} ;
            echo month with zero-padding : {{ '{:02}'.format(execution_date.month) }} ;
            echo execution_date.day: {{execution_date.day}} ;
            echo day with zero-padding : {{ '{:02}'.format(execution_date.day) }} ;
            echo execution_date.hour: {{execution_date.hour}} ;
            echo hour with zero-padding : {{ '{:02}'.format(execution_date.hour) }} ;
        '''
    )

결과 :

execution_date : 2023-10-01T07:23:29.703212+00:00
execution_date.year: 2023
execution_date.month: 10
month with zero-padding : 10
execution_date.day: 1
day with zero-padding : 01
execution_date.hour: 7
hour with zero-padding : 07

 


 

 

PythonOperator 가 실행하는 함수에서 execution_date 를 사용하는 여러가지 방법

def _func1(execution_date):
    print('execution_date : ', execution_date)
    year, month, day, hour, *_ = execution_date.timetuple()
    print('year : ', year)
    print('month : ', month)
    print('day : ', day)
    print('hour : ', hour)

def _func2(**context):
    print('execution_date : ', context['execution_date'])

def _func3(execution_date, **context):
    print('execution_date : ', execution_date)
    print('execution_date' in context) #False

...

    p1 = PythonOperator(
        task_id='p1',
        python_callable=_func1,
    )

    p2 = PythonOperator(
        task_id='p2',
        python_callable=_func2,
    )

    p3 = PythonOperator(
        task_id='p3',
        python_callable=_func3,
    )

 

p3 처럼 execution_date 를 파라미터에 직접 넣었다면

**context 에 execution_date 값은 불포함

 


 

 

 

PythonOperator 가 실행하는 함수에 변수 넣는 방법

def _func1(greeting, name, date):
    print(f"{greeting}! I'm {name}. Today is {date}")

def _func2(param1, param2, param3, **context):
    print(f"{param1}! I'm {param2}. Today is {param3}")

...

    p1 = PythonOperator(
        task_id='p1',
        python_callable=_func1,
        op_kwargs={
            "greeting":"hello",
            "name":"eyeballs",
            "date":"{{execution_date}}"
        }
    )

    p2 = PythonOperator(
        task_id='p2',
        python_callable=_func2,
        op_args=["hello", "eyeballs", "{{execution_date}}"]
    )

func1 결과 :

[2023-10-02, 00:31:57 UTC] {logging_mixin.py:109} INFO - hello! I'm eyeballs. Today is 2023-10-01T00:00:00+00:00

func2 결과 :

[2023-10-02, 00:32:00 UTC] {logging_mixin.py:109} INFO - hello! I'm eyeballs. Today is 2023-10-01T00:00:00+00:00

 

아래와 같이 Rendered 를 누르면 Python Operator 에서 함수로 넘겨준 파라미터들을 볼 수 있음

 

 

 


Airflow Task 간 데이터를 교환/전송할 때 xcom 을 사용할 수 있음

하나의 Task 가 xcom(피클링)을 통해 데이터를 메타스토어에 저장하고,

다른 Task 에서 xcom 이 저장한 데이터를 메타스토어로부터 읽는 것으로 데이터를 교환함

 

피클 Picke 이란, Python 객체를 디스크에 저장할 수 있게 하는 직렬화 프로토콜

피클링된 객체는 메타스토어의 블록 blob 에 저장되기 때문에

일반적으로 문자열 몇 개와 같은 작은 데이터 전송에만 적용 가능

 

Task 간 좀 더 큰 데이터를 교환 할 때는 xcom 이 아닌 외부 데이터베이스를 사용해야 함

 

xcom 샘플 코드

def _push_xcom(**context):
    context["task_instance"].xcom_push(key="name", value="eyeballs")

def _pull_xcom(**context):
    name=context["task_instance"].xcom_pull(task_ids="push_op", key="name")
    print(f"Hello, {name}!")

...

    push_op = PythonOperator(
        task_id='push_op',
        python_callable=_push_xcom,
    )

    pull_op = PythonOperator(
        task_id='pull_op',
        python_callable=_pull_xcom,
    )

    push_op >> pull_op

위 코드에서 사용된 xcom_pull 은 현재 DAG 실행을 통해 게시된 값만 가져옴

다른 날짜에서 실행된 task 가 xcom_push 로 넣은 값을 (키값이 동일해도) 가져오지 않는다는 말임

 

 


 

의존성 fan-out, fan-in

a >> [b,c] >> d

 


트리거 규칙 Trigger Rule 이란, Task 가 트리거되는 기준을 의미함

즉, Task 가 언제 어떤 조건이 되었을 때 실행 될 것인지를 말 함

 

airflow Trigger Rule 문서 :

https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html?highlight=all_failed#trigger-rules 

  • all_success (default): All upstream tasks have succeeded
    모든 상위 태스크가 성공하면 트리거됨
  • all_failed: All upstream tasks are in a failed or upstream_failed state
    모든 상위 태스크가 실패하면 트리거됨
    태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드를 트리거하는 용도로 사용
  • all_done: All upstream tasks are done with their execution
    성공/실패 상관 없이 모든 상위 태스크가 실행을 마치면 트리거됨
    모든 상위 태스크 실행 완료 후 청소 코드 등을 실행하는 용도로 사용
  • one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
    하나 이상의 상위 태스크가 실패하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음
    알림 또는 롤백 같은 일부 오류 처리 코드를 빠르게 실행하기 위한 용도로 사용
  • one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
    하나 이상의 상위 태스크가 성공하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음
    하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알람을 빠르게 실행하기 위한 용도로 사용
  • none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
    실패한 상위 태스크는 없지만, 태스크가 성공 또는 건너 뛴 경우 트리거됨
    DAG 에 조건부 브랜치 구조가 있는 경우 사용
  • none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
  • none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
    건너뛴 상위 태스크는 없지만, 태스크가 성공 또는 실패한 경우 트리거됨
    모든 업스트림 태스크가 실행된 경우, 해당 결과를 무시하고 실행하기 위한 용도로 사용
  • always: No dependencies at all, run this task at any time
    상위 태스크 상태와 상관없이 트리거됨
    테스트 용도로 사용

 


 

센서는 특정 조건이 true 인지 지속적으로 확인하고 true 라면 성공

만약 특정 조건에 false 가 뜨면, 상태가 true 가 될 때 까지 혹은 지정된 timeout 이 날 때까지 계속 확인함

 

PythonSensor 의 경우, PythonSensor가 사용하는 함수의 return 값이 true 인지 false 인지를 계속 확인함

PythonSensor 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.dummy import DummyOperator

def _sensor():
    if datetime.now().second == 30 : return True
    else : return False

with DAG(
    dag_id='sensor',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
) as dag:

    start_op = DummyOperator(
        task_id='start_op',
    )

    sensor_op = PythonSensor(
        task_id='sensor_op',
        python_callable=_sensor,
    )

    stop_op = DummyOperator(
        task_id='stop_op',
    )

    start_op >> sensor_op >> stop_op

 

 


 

 

sensor 가 무한정 기다리는 상황이 닥치게 되면

그 다음 스케줄링 때 다시 sensor 가 실행되고

그 다음 스케줄링 떄 다시 sensor 가 실행되고...

이렇게 sensor 가 끝나지 않고 계속 누적되어 동작하여

DAG 내의 최대 task 수 (default 16) 개에 도달하게 되면

새로운 task 가 실행 자체를 못 하는 경우가 발생함

 

따라서 DAG 내 최대 task 수를 조절하거나

sensor 의 timeout 을 하루 등으로 줄이거나

poke 모드 대신 reschedule 모드를 사용하여 포크 동작을 실행할 때만 task 슬롯을 차지하고

대기 시간 동안은 task 슬롯을 차지하지 않도록 만들어야 함

 


 

Dag 에서 다른 Dag 를 Trigger 할 수 있음

TriggerDagRunOperator 샘플 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id='dag1',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
) as dag1:
    start_op = DummyOperator(
        task_id='start_op',
    )
    trigger_dag = TriggerDagRunOperator(
        task_id='trigger_dag',
        trigger_dag_id='dag2'
    )
    start_op >> trigger_dag 

with DAG(
    dag_id='dag2',
    schedule_interval=None,
    start_date=datetime(2023,6,25),
) as dag2:
    start_op = DummyOperator(
        task_id='start_op',
    )
    start_op

결과 : dag2 의 Schedule 은 None 임에도 불구하고, dag1 에 의해 호출되어 실행됨

 


 

Dag 간 fan-in 의존성 맞추는 방법은 시스템 상에서 제공되지 않음

예를 들어 dag1과 dag2가 모두 성공해야 dag3 이 트리거되는 방법은 없다는 말임

대신 xcom 등으로 dag1, dag2 의 성공 여부를 저장하고 dag3 에서 센서 등으로 결과값을 확인한다거나

ExternalTaskSensor 를 사용하여 dag1, dag2 의 상태를 꾸준히 확인하는 방법을 사용 가능함

 

Airflow ExternalTaskSensor Documentation : https://airflow.apache.org/docs/apache-airflow/2.2.2/howto/operator/external_task_sensor.html

 

ExternalTaskSensor 를 사용하는 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

with DAG(
    dag_id='dag1',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
) as dag1:
    dag1_op = DummyOperator(
        task_id='dag1_op',
    )

with DAG(
    dag_id='dag2',
    schedule_interval="0 0 * * *",
    start_date=datetime(2023,6,25),
    catchup=False,
) as dag2:
    dag2_op = DummyOperator(
        task_id='dag2_op',
    )

with DAG(
    dag_id='dag3',
    schedule_interval=None,
    start_date=datetime(2023,6,25),
    catchup=False,
) as dag3:
    dag1_sensor = ExternalTaskSensor(
        task_id = "dag1_sensor",
        external_dag_id = "dag1",
        external_task_id = "dag1_op"
    )
    dag2_sensor = ExternalTaskSensor(
        task_id = "dag2_sensor",
        external_dag_id = "dag2",
        external_task_id = "dag2_op"
    )
    start_op = DummyOperator(
        task_id='start_op',
    )
    [dag1_sensor, dag2_sensor] >> start_op

 


 

Task, DAG의 실행이 평소보다 오래 걸리는 상황을 파악하기 위해

SLA(Service-Level Agreement) 을 추가 가능

SLA 로 지정해 둔 제한 시간보다 실행 시간이 오래 걸린다면 경고를 주거나 python 함수 등을 실행하게 만듦

 

SLA는 Dag 및 Task 에 각각 설정 가능함

Task 의 시작 또는 종료 시간이 DAG 시작 시간고 비교하여 SLA 에 지정한 제한 시간을 넘겼는지 확인 할 것임

 

SLA 를 2초로 설정한 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

default_args={
    "sla":timedelta(seconds=2),
}

def sla_miss_callback(context):
    print("TOO LONG!")

def _func():
    sum = 0
    for i in range(99999999):
        sum = sum + i
    print(sum)

with DAG(
    dag_id='test',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 6, 25),
    catchup=False,
    sla_miss_callback=sla_miss_callback,
    default_args=default_args,
) as dag:

    start = DummyOperator(
        task_id='dummy_operator',
    )
    
    python = PythonOperator(
        task_id='python',
        python_callable=_func
    )

    start >> python

 

다음과 같이 Operator 에 적용할 수 있다고 함

    python = PythonOperator(
        task_id='python',
        python_callable=_func,
        sla=timedelta(seconds=2)
    )

 

DAG 의 시작 시간과 Task 의 종료 시간을 계속 비교한다고 함

주의 할 점은 비교하는 시작 시간이 Task 의 시작 시간이 아니라 DAG 의 시작 시간이라는 것

(개별 Task 가 아닌 DAG 시작 시간을 기준으로 SLA 가 정의되었기 때문)

 


sla 뿐만 아니라, Task 가 성공했을 때 혹은 Task 가 실패했을 때 callback 을 호출하도록 만들 수 있음

 

NameDescription
on_success_callback Invoked when the task succeeds
on_failure_callback Invoked when the task fails
sla_miss_callback Invoked when a task misses its defined SLA
on_retry_callback Invoked when the task is up for retry

 

Callback 사용 샘플 코드

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
) as dag:

    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")
    task3 = DummyOperator(task_id="task3", on_success_callback=dag_success_alert)
    task1 >> task2 >> task3

 

 

Task 들을 그룹으로 묶어서

눈으로 보기에 좀 더 정렬되어 보이고 이해하기 쉽도록 만들 수 있음

 

 

Task Group 사용 샘플 코드

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

# [START howto_task_group]
with DAG(
    dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]
) as dag:
    start = DummyOperator(task_id="start")

    # [START howto_task_group_section_1]
    with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
        task_1 = DummyOperator(task_id="task_1")
        task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
        task_3 = DummyOperator(task_id="task_3")

        task_1 >> [task_2, task_3]
    # [END howto_task_group_section_1]

    # [START howto_task_group_section_2]
    with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
        task_1 = DummyOperator(task_id="task_1")

        # [START howto_task_group_inner_section_2]
        with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = DummyOperator(task_id="task_3")
            task_4 = DummyOperator(task_id="task_4")

            [task_2, task_3] >> task_4
        # [END howto_task_group_inner_section_2]

    # [END howto_task_group_section_2]

    end = DummyOperator(task_id='end')

    start >> section_1 >> section_2 >> end

 

Airflow WebUI Graph 에서 보이는 화면

차례대로 Group 을 클릭하지 않았을 때, Group 을 클릭했을 때

 

 

전치사 활용한 표현

 

 

Im speaking from my own experience.

Difficult to draw any definite conclusion from this evidence aloneFrom the look of things, this will take much longer than we thought.I can figure out one conclusion from this truth.From : (~에) 기초하여, (~을) 토대로draw : (결론을) 내리다

From the look of things : 상황을 보니..

 

 

Don't drink milk from the milkbox.

She drank water from the bottle.

My cat likes drinking from the faucet!

From : (~에) 대로 (마시는)

https://kr.freepik.com/premium-photo/cat-fiona-drinking-water-from-tap_49494254.htm

 

 

You can build yourself a nice PC for 300 dollars.

You can buy a nice house on the island for 20 billion dollars

For : (~의 돈을) 들이면

 

 

I'm all for the idea! 그 아이디어에 찬성!

Are you for or against the proposal??

For : (~에) 찬성하는

 

 

I'm in a yoga class.

She was in the univ.

My child is in middle school.

When I was in colleage, I waited tables at a family restaurant.

In : (~에) 재학중인, 다니고 있는

wait tables : 종업원으로 일하다

 

Usually I'm in bed by 9

She is in bed now.

In bed : 잠자리에 든

 

 

It's not in our interest to use your service.

This law is not in the interest of consumers.

In the interest : (~의) 이익에 부합하는

 

I'm in love.

She is in a coma.

Are you in pain?

The economy is in a deep slump...

In : (~에) 빠져있는

 

 

She looked at herself in the mirror.

He caught a glimpse of a beautiful woman in the mirror.

In the mirror : 거울에 비친

catch a glimpse of : ~을 흘끗 보다, 언뜻 보다

glimpse : (완전히는 못 보고) 잠깐[언뜻] 봄, 일별


I work in the IT field.

He is in the Internet business!

My family has been in the food business for generations.

In : ~의 사업을 하고 있는, ~의 분야에 종사하는

for generations : 여러 세대 동안

 

'English' 카테고리의 다른 글

Study English 24.05.31  (0) 2024.05.31
Study English 24.05.30  (0) 2024.05.30
Study English 23.09.04  (0) 2023.09.19
Study English 23.09.03 구동사 모음9  (0) 2023.09.17
Study English 23.09.02  (0) 2023.09.16

전치사 활용한 표현

 

 

I hurried after him to the front door.

I ran out after him, but he was out of sight.

After : (~을) 뒤따라

 

We earned 50,000 bucks last month after expenses.

My net income after tax and insurance is about 70 to 80 percent of my wage.

After : (계산에서 ~을) 빼고, 제외하고

real income : 실질 소득

net income : 순소득

 

The police are after him

I thought she was just a reporter after a story.

I'm just a reporter after a scoop.

I don't know what he is after.

After : (~을) 쫓는, 노리는

scoop : 특종

 

So I decided to drop out of colleage against his parents' wishes and trust that it would all work out OK.

He kept on drinking alcohol against doctor's warnings.

Against : (~을) 거스르는, 무시하는

 

Bundle up against the cold this winter.

About two in five koreans have been vaccinated against the new flu.

Against : (~에) 대비하는, (~을) 막는

two in five : 다섯명 중 두명

Bundle up : 옷을 두껍게 입다

 

Most images look better against black.

This is a story set against the American Revolution

Against : (~을) 배경으로 하는

 

I was shocked at the price.

I was surprised at the news.

At : (~을) 보고, 듣고

 

We are a nation at war.

I was at breakfast with a client this morning.

I saw some children at play in the garden.

They are at it again!

While you are at it, could you get me some snacks?

At : (~을) 하고 있는 중인

 

 

I'm between Android and Apple.

I'm between Disney Plus and Netflix.

Between : (두 가지를 놓고) 고민 중인

 

I'm between jobs. 전 직장 그만두고 새 직장을 구하는 중

I'm between books. 책 하나 쓰고 다음 책 구상 중

I'm between classes. 수업 하나 듣고 다음 수업 기다리는 중

I'm between projects. 프로젝트 하나 끝내고 다음 프로젝트 기다리는 중

Between : 일을 끝내고 다른 일을 기다리고 있는

 

 

'English' 카테고리의 다른 글

Study English 24.05.30  (0) 2024.05.30
Study English 23.09.05  (0) 2023.09.19
Study English 23.09.03 구동사 모음9  (0) 2023.09.17
Study English 23.09.02  (0) 2023.09.16
Study English 23.09.01  (0) 2023.09.16

아래 유튜브에서 내게 필요한 내용 필기

https://youtu.be/Z8tPysPfHOc?si=nF1DWhMFam9tIPfn 

 

 

 

He is ready to pick on me for no reason.

Pick on : 비난하다

 

She always picks on her friends.

Pick on : 괴롭히다

 

Stick around, I need your help.

Stick around : (어떤 곳에서) 가지 않고 머무르다.

 

She talked me into going to the party together.

Talk into : 설득하다

 

I will see if I can talk some sense into her.

Talk into : ~하게 하다

알아듣게 타이를께

 

Everyone left, but I choose to stay behind.

Stay behind : (뒤에) 남다, 출발하지 않다

 

Please stay behind the safety line!

Stay behind : (특정 위치) 뒤에 머무르다

 

Do you want to come over after work?

Come over : 방문하다, 찾아오다

우리집 올래?

 

Please keep it down!!!

Keep down : 낮추다, 조용히하다

조용해!!!

 

Don't stir up trouble 문제를 일으키지 마

Stir up nostalgia. 향수를 일으키다

Stir up : (문제, 향수 등)을 일으키다

 

The origin of Halloween is to ward off evil spirits.

Ward off : 물리치다, 피하다, 방어하다

 

The expectations of others can often weigh a person down.

Weigh down : 부담되게 하다, 마음을 누르다

 

Bottles and debris washed up on the shore.

Wash up : (바다, 강에 떠내려가서 어떤 장소에) 도달하다

debris : 잔해, 쓰레기

shore : 기슭

 

His behavior really cut her up.

Cut up : (물리적으로, 정서적으로) 상처를 주다

 

He always cuts up in class.

Cut up : 장난치다, 농담하다

 

The festival kicked off with a parade.

Kick off : (행사, 이벤트, 경기를) 시작하다

 

I don't have work tomorrow, so I'm gonna sleep in.

Sleep in : 늦게까지 자다, 늦잠자다

 

The hurricane wiped out entire villages.

Wipe out : (대규모 파괴, 소멸) 완전히 파괴하다, 없애다

 

Scroll down to see more content.

Scroll up/down : 화면을 올리다/내리다

 

I need to change my password; I think I've been hacked into.

Hack into : (~에) 침투하다

 

As night fell, the noises of the city began to die away.

Die away : 서서히 사라지다, 잦아들다

night fell : 밤이 오다

nightfall : 해질녘(=dust)

 

I need to draw out some cash from the ATM.

Draw out : 인출하다(=withdraw)

입금하다 : deposit, put in

 

The meeting was drawn out for another hour.

Draw out : (평상시보다 혹은 필요 이상으로 ~을) 길게 하다, 길게 끌다

 

Winter seems to be setting in early this year.....

Set in : 시작되다, 찾아오다

 

We set aside some money for rainy days.

Set aside : 따로 빼두다.

for rainy days : 비상시를 위해

 

The court set aside the previous verdict.

Set aside : 취소하다, 무시하다

verdict : 판결

 

Set aside the leftovers in the fridge.

Set aside : 저장하다, 보관하다

 

He couldn't shake off his anxiety.

He couldn't shake off his worries about the future....

Shake off : 감정이나 생각을 버리다.

 

She managed to shake off the reporters who were following her.

Shake off : ~를 따돌리다

manage to : ~하도록 애쓰다

 

When he saw the police coming, he run off...!!

Run off : 급히 달아나다

 

Online sales account for 20% of our company's total revenue.

Account for : (비율, 수량 등을) 차지하다, 설명하다

revenue : 수익

 

No worries, I won't let on your secret!

Let on : (비밀 등을) 말하다

 

 

 

 

'English' 카테고리의 다른 글

Study English 23.09.05  (0) 2023.09.19
Study English 23.09.04  (0) 2023.09.19
Study English 23.09.02  (0) 2023.09.16
Study English 23.09.01  (0) 2023.09.16
Study English 23.08.31  (0) 2023.09.15

+ Recent posts