[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conf : <airflow.configuration.AirflowConfigParser object at 0x7f9bc30a6eb8>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag : <DAG: execution_date>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - dag_run : <DagRun execution_date @ 2023-10-01 13:23:29.703212+00:00: manual__2023-10-01T13:23:29.703212+00:00, externally triggered: True>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_end : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - data_interval_start : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - inlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - logical_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - macros : <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end | ds_nodash' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - next_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - outlets : []
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - params : {'example_key': 'example_value'}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_start_success : 2023-09-30T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_data_interval_end_success : 2023-10-01T00:00:00+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds : 2023-10-01
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_ds_nodash : 20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_execution_date_success : None
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - prev_start_date_success : 2023-10-01T13:43:50.677389+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - run_id : manual__2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task : <Task(PythonOperator): p1>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - task_instance_key_str : execution_date__p1__20231001
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - test_mode : False
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ti : <TaskInstance: execution_date.p1 manual__2023-10-01T13:23:29.703212+00:00 [running]>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds : 2023-10-02
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - tomorrow_ds_nodash : 20231002
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts : 2023-10-01T13:23:29.703212+00:00
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash : 20231001T132329
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - ts_nodash_with_tz : 20231001T132329.703212+0000
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - var : {'json': None, 'value': None}
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - conn : <airflow.models.taskinstance.TaskInstance.get_template_context.<locals>.ConnectionAccessor object at 0x7f9bc06c3748>
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds : 2023-09-30
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - yesterday_ds_nodash : 20230930
[2023-10-01, 14:28:52 UTC] {logging_mixin.py:109} INFO - templates_dict : None
airflow 2.2 버전부터 헷갈리는 execution_date 대신 data_interval_start 를 사용하도록 업데이트 됨
all_success (default): All upstream tasks have succeeded 모든 상위 태스크가 성공하면 트리거됨
all_failed: All upstream tasks are in a failed or upstream_failed state 모든 상위 태스크가 실패하면 트리거됨 태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드를 트리거하는 용도로 사용
all_done: All upstream tasks are done with their execution 성공/실패 상관 없이 모든 상위 태스크가 실행을 마치면 트리거됨 모든 상위 태스크 실행 완료 후 청소 코드 등을 실행하는 용도로 사용
one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done) 하나 이상의 상위 태스크가 실패하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음 알림 또는 롤백 같은 일부 오류 처리 코드를 빠르게 실행하기 위한 용도로 사용
one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done) 하나 이상의 상위 태스크가 성공하자마자 트리거됨. 다른 상위 태스크의 실행 완료는 기다리지 않음 하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알람을 빠르게 실행하기 위한 용도로 사용
none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped 실패한 상위 태스크는 없지만, 태스크가 성공 또는 건너 뛴 경우 트리거됨 DAG 에 조건부 브랜치 구조가 있는 경우 사용
none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state 건너뛴 상위 태스크는 없지만, 태스크가 성공 또는 실패한 경우 트리거됨 모든 업스트림 태스크가 실행된 경우, 해당 결과를 무시하고 실행하기 위한 용도로 사용
always: No dependencies at all, run this task at any time 상위 태스크 상태와 상관없이 트리거됨 테스트 용도로 사용
센서는 특정 조건이 true 인지 지속적으로 확인하고 true 라면 성공
만약 특정 조건에 false 가 뜨면, 상태가 true 가 될 때 까지 혹은 지정된 timeout 이 날 때까지 계속 확인함
PythonSensor 의 경우, PythonSensor가 사용하는 함수의 return 값이 true 인지 false 인지를 계속 확인함
PythonSensor 샘플 코드
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.dummy import DummyOperator
def _sensor():
if datetime.now().second == 30 : return True
else : return False
with DAG(
dag_id='sensor',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 6, 25),
catchup=False,
) as dag:
start_op = DummyOperator(
task_id='start_op',
)
sensor_op = PythonSensor(
task_id='sensor_op',
python_callable=_sensor,
)
stop_op = DummyOperator(
task_id='stop_op',
)
start_op >> sensor_op >> stop_op
sensor 가 무한정 기다리는 상황이 닥치게 되면
그 다음 스케줄링 때 다시 sensor 가 실행되고
그 다음 스케줄링 떄 다시 sensor 가 실행되고...
이렇게 sensor 가 끝나지 않고 계속 누적되어 동작하여
DAG 내의 최대 task 수 (default 16) 개에 도달하게 되면
새로운 task 가 실행 자체를 못 하는 경우가 발생함
따라서 DAG 내 최대 task 수를 조절하거나
sensor 의 timeout 을 하루 등으로 줄이거나
poke 모드 대신 reschedule 모드를 사용하여 포크 동작을 실행할 때만 task 슬롯을 차지하고
대기 시간 동안은 task 슬롯을 차지하지 않도록 만들어야 함
Dag 에서 다른 Dag 를 Trigger 할 수 있음
TriggerDagRunOperator 샘플 코드
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id='dag1',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 6, 25),
) as dag1:
start_op = DummyOperator(
task_id='start_op',
)
trigger_dag = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='dag2'
)
start_op >> trigger_dag
with DAG(
dag_id='dag2',
schedule_interval=None,
start_date=datetime(2023,6,25),
) as dag2:
start_op = DummyOperator(
task_id='start_op',
)
start_op
결과 : dag2 의 Schedule 은 None 임에도 불구하고, dag1 에 의해 호출되어 실행됨
Dag 간 fan-in 의존성 맞추는 방법은 시스템 상에서 제공되지 않음
예를 들어 dag1과 dag2가 모두 성공해야 dag3 이 트리거되는 방법은 없다는 말임
대신 xcom 등으로 dag1, dag2 의 성공 여부를 저장하고 dag3 에서 센서 등으로 결과값을 확인한다거나
ExternalTaskSensor 를 사용하여 dag1, dag2 의 상태를 꾸준히 확인하는 방법을 사용 가능함
ExternalTaskSensor 를 사용하는 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
with DAG(
dag_id='dag1',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 6, 25),
catchup=False,
) as dag1:
dag1_op = DummyOperator(
task_id='dag1_op',
)
with DAG(
dag_id='dag2',
schedule_interval="0 0 * * *",
start_date=datetime(2023,6,25),
catchup=False,
) as dag2:
dag2_op = DummyOperator(
task_id='dag2_op',
)
with DAG(
dag_id='dag3',
schedule_interval=None,
start_date=datetime(2023,6,25),
catchup=False,
) as dag3:
dag1_sensor = ExternalTaskSensor(
task_id = "dag1_sensor",
external_dag_id = "dag1",
external_task_id = "dag1_op"
)
dag2_sensor = ExternalTaskSensor(
task_id = "dag2_sensor",
external_dag_id = "dag2",
external_task_id = "dag2_op"
)
start_op = DummyOperator(
task_id='start_op',
)
[dag1_sensor, dag2_sensor] >> start_op
Task, DAG의 실행이 평소보다 오래 걸리는 상황을 파악하기 위해
SLA(Service-Level Agreement) 을 추가 가능
SLA 로 지정해 둔 제한 시간보다 실행 시간이 오래 걸린다면 경고를 주거나 python 함수 등을 실행하게 만듦
SLA는 Dag 및 Task 에 각각 설정 가능함
Task 의 시작 또는 종료 시간이 DAG 시작 시간고 비교하여 SLA 에 지정한 제한 시간을 넘겼는지 확인 할 것임
SLA 를 2초로 설정한 코드 예제 (테스트 진행되지 않았으니, 반드시 테스트 진행 후 사용해야 함)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
default_args={
"sla":timedelta(seconds=2),
}
def sla_miss_callback(context):
print("TOO LONG!")
def _func():
sum = 0
for i in range(99999999):
sum = sum + i
print(sum)
with DAG(
dag_id='test',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 6, 25),
catchup=False,
sla_miss_callback=sla_miss_callback,
default_args=default_args,
) as dag:
start = DummyOperator(
task_id='dummy_operator',
)
python = PythonOperator(
task_id='python',
python_callable=_func
)
start >> python
Difficult to draw any definite conclusion from this evidence aloneFrom the look of things, this will take much longer than we thought.I can figure out one conclusion from this truth.From : (~에) 기초하여, (~을) 토대로draw : (결론을) 내리다