환경 : ubuntu 위에서 동작하는 centos7 도커 컨테이너 2대

버전 : postgresql 13

 

Dockerfile

FROM centos:7
ENV container docker
RUN (cd /lib/systemd/system/sysinit.target.wants/; for i in *; do [ $i == \
  systemd-tmpfiles-setup.service ] || rm -f $i; done); \
  rm -f /lib/systemd/system/multi-user.target.wants/*;\
  rm -f /etc/systemd/system/*.wants/*;\
  rm -f /lib/systemd/system/local-fs.target.wants/*; \
  rm -f /lib/systemd/system/sockets.target.wants/*udev*; \
  rm -f /lib/systemd/system/sockets.target.wants/*initctl*; \
  rm -f /lib/systemd/system/basic.target.wants/*;\
  rm -f /lib/systemd/system/anaconda.target.wants/*;\
  echo "sslverify=false" >> /etc/yum.conf;\
  yum install -y wget sudo;\
  wget https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm --no-check-certificate;\
  sudo yum install -y pgdg-redhat-repo-latest.noarch.rpm;\
  sudo yum install -y postgresql13-server;
VOLUME [ "/sys/fs/cgroup" ]
COPY ./start-postgresql13.sh ./start-postgresql13.sh
RUN chmod 777 ./start-postgresql13.sh
CMD [ "./start-postgresql13.sh" ]

 

start-postgresql13.sh

#!/bin/bash

sudo /usr/pgsql-13/bin/postgresql-13-setup initdb
sudo systemctl enable postgresql-13
sudo systemctl start postgresql-13

 

docker-compose

version: '3.3'

services:
  active:
    image: postcent:7
    container_name: p1
    privileged: true
    stdin_open: true
    tty: true
    entrypoint: init

  standby:
    image: postcent:7
    container_name: p2
    privileged: true
    stdin_open: true
    tty: true
    entrypoint: init

 

 

Dockerfile 을 다음과 같이 빌드하여 이미지를 만듦

sudo docker build --tag postcent:7 .

 

docker-compose 명령어로 두 centos7 서버를 실행함

sudo docker-compose up -d

 

p1, p2 컨테이너로 각각 들어감

sudo docker exec -it p1 bash
sudo docker exec -it p2 bash

 

p1, p2 에서 아래 shell script 실행하여 postgresql 13 실행

/start-postgresql13.sh

 

p1 은 active 로 사용할 예정이고

p2 는 standby (read-only) 로 사용할 예정

 

 

 

< p1 >

 

p1 에서 아래 작업 진행 

아래 경로로 이동

cd /var/lib/pgsql/13/data

 

postgres.conf 에 아래 추가

listen_addresses = '*'
wal_level = hot_standby
max_wal_senders = 2
max_replication_slots = 2

 

pg_hba.conf 에 아래 추가

host all all 0.0.0.0/0 md5
host replication replication 0.0.0.0/0 md5

두번째 0.0.0.0/0 에 standby 의 ip 가 들어가도 됨

 

 

사용자 계정을 postgres 로 변경

su postgres

 

postgresql 접속

psql


replication 전용 유저 생성

create user replication replication password 'pwd';

 

확인

\du

 

\q 명령어로 psql 빠져나옴

exit 명령어로 postgres 유저에서 root 유저로 나옴

 

postgresql13 재시작

systemctl restart postgresql-13

 

 

혹시 아래와 같은 에러가 난다면, conf 파일을 제대로 읽지 못해서 생김

(위에서 수정한 conf 파일이 잘못되었거나, conf 파일 접근 권한이 없거나, 혹은 소유자가 postgres 가 아니거나 등)

Job for postgresql-13.service failed because the control process exited with error code. See "systemctl status postgresql-13.service" and "journalctl -xe" for details.

 

 

 

 

< p2 >

 

postgres 유저로 변경

su postgres

 

data 삭제

rm -r /var/lib/pgsql/13/data

 

active 인 p1 으로부터 data 를 복사해서 p2 에 넣음

pg_basebackup -h p1 -D /var/lib/pgsql/13/data -U replication -Fp -Xs -P -R

중간 -h p1 에는 p1 의 ip 가 들어감

 

data 로 이동

cd /var/lib/pgsql/13/data


postgresql.conf 에 아래 추가

hot_standby = on
hot_standby_feedback = on

 

exit 명령어로 postgres 유저에서 root 유저로 나옴

 

postgresql13 재시작

systemctl restart postgresql-13

 

 

 

 

 

< active, standby 설정 확인 >

 

아래 명령어를 통해 wal 프로세스를 확인

ps -ef | grep wal 

 

active 인 p1 에서는 postgres: walsender 가 나와야 하고

standby 인 p2 에서는 postgres: walreceiver 가 나와야 함

 

 

psql 에 접근한 후, 아래 명령어 사용

select pg_is_in_recovery();

 

active 인 p1 에서는 f 가 나와야 하고

standby 인 p2 에서는 t 가 나와야 함

 

 

 

active p1 에서 psql 에 접근한 후, 아래 명령어로 테스트용 database 를 생성

CREATE DATABASE test;

 

standby p2 에서 psql 에 접근한 후,

테스트용 database 가 p2 에서도 잘 보이는지 아래 명령어로 확인

\list

 

 

 

< failover 및 failback>

 

active 인 p1 이 죽었을 때, standby 인 p2 가 active 역할을 하게 만드는 것(failover)과

죽은 active p1이 다시 살아났을 때 p1 이 다시 active 가 되는 것(failback)은 수동 작업에 의해 진행됨

수동작업 [참고]

 

자동 failover 및 failback 을 하려면

pgpool-II 라는 미들웨어가 필요함

 

 

참고

https://velog.io/@enosoup/PostgreSQL-HA-%EA%B5%AC%EC%84%B1

https://bingbingpa.github.io/postgresql-high-availability/

https://jinisbonusbook.tistory.com/71?category=939888 

https://browndwarf.tistory.com/4

https://chess-drive.tistory.com/56

 

 

 

 

 

 

 

'SQL' 카테고리의 다른 글

[Hive] url decoder 예제  (0) 2022.05.05
[SQL] hackerrank New Company 쿼리  (0) 2022.03.09
[MySQL] 여러가지 함수 예제 모음  (2) 2022.02.13
[SQL] WAL 이란 무엇인가?  (0) 2022.02.08
[Phoenix] update values 하는 방법  (0) 2021.11.29

 

local 컴퓨터 위에 airflow 와 postgreSQL 을 설치하고 이 둘을 연동하는 작업 진행

os 는 centos 7

python3 사용

 

 

< postgreSQL 설정 >

 

아래 명령어로 postgres 설치

출처 : https://www.postgresql.org/download/linux/redhat/


sudo yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm
sudo yum install -y postgresql13-server
sudo /usr/pgsql-13/bin/postgresql-13-setup initdb
sudo systemctl enable postgresql-13
sudo systemctl start postgresql-13
sudo systemctl status postgresql-13

출처에 가보면, centos 외에 ubuntu 등 다른 os 에 postgreSQL 설치하는 방법이 나와있으니 참고

 

"Peer's certificate issuer has been marked as not trusted by the user." 혹은

"Curl error (60) SSL certificate problem" 등의 SSL certificate 이슈가 나타난다면,

vi /etc/yum.conf 를 열고 "sslverify=false"를 추가

참고 https://eyeballs.tistory.com/544

 

postgreSQL 을 설치하면, postgres 라는 계정이 추가됨 (확인 : cat /etc/passwd | grep postgres)

postgres 계정명으로 psql 에 접근한 후,

airflow 가 사용할 database 및 user role 생성


su - postgres
psql


\list
CREATE DATABASE airflow;
\list
CREATE USER airflow WITH ENCRYPTED PASSWORD 'airflow';
\du
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;


airflow 가 사용할 db 의 이름은 airflow 이고,

postgresql 내에서 사용되는 계정명과 비밀번호 모두 airflow 로 설정함

자주 사용되는 postgresql 명령어는 가장 아랫쪽에 적어둠

 

 

< Airflow 설정 >

 

bashrc 를 열고 안에 AIRFLOW_HOME 추가


vi ~/.bashrc 

export AIRFLOW_HOME=/opt/airflow

source ~/.bashrc

 

아래 명령어로 apache airflow 설치

 


pip3 install apache-airflow

 

airflow db init 명령어 실행하면, sqlite 를 이용한 메타데이터 데이터베이스가 설정됨

airflow.cfg 를 아래와 같이 수정함


cd $AIRFLOW_HOME
vi airflow.cfg


sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
base_url = http://localhost:8080
web_server_port = 8080
executor = LocalExecutor
load_examples = False


 

sql_alchemy_conn 에 들어가는 airflow 의미는 다음과 같음

sql_alchemy_conn = postgresql+psycopg2://[계정명]:[비밀번호]@localhost:5432/[데이터베이스명]

위에서 계정명, 비번, DB명 모두 airflow 로 설정했기 때문에 모두 airflow 라고 적음

localhost 인 이유는, postgreSQL 데몬이 동일한 서버(localhost:127.0.0.1) 에 떠있기 때문

 

executor 는 병렬 처리가 불가능한 SequentialExecutor 가 기본값인데,

병렬 처리가 가능한 LocalExecutor 로 변경

 

airflow WebUI 포트를 변경하려면 base_url 과 web_server_port 의 기본 8080 포트 번호를 다른 값으로 변경하면 됨

 

load_examples 는 기본값이 True 인데, airflow 처음 실행시 생성되는 예제 dags 를 띄우지 않으려면 False 로 변경

 

 

이후 아래 명령어 실행


airflow db init
airflow users create --username admin --password admin --role Admin --email eyeballs@example.com --firstname admin --lastname admin
airflow scheduler -D > /dev/null
airflow webserver -D


airflow WebUI 에 설정되는 관리자 계정명과 비밀번호는 모두 admin 으로 설정함

email 에는 자신의 이메일을 적어줌

 

airflow db init 진행시 아래 에러가 난다면


ModuleNotFoundError: No module named 'psycopg2'

 psycopg2 설치


pip3 install psycopg2

설치시 모종의 이유로 에러가 난다면, psycopg2-binary 를 대신 설치


pip3 install psycopg2-binary

 

 

아래 url 로 airflow WebUI 접근


localhost:8080

 

로그인에 사용되는 계정명과 비밀번호는 모두 admin

 



 

< 자주 사용되는 postgresql 명령어 >

 

\list # 데이터베이스 확인

\c airflow # 데이터베이스 사용

\dt # 테이블 확인

\ds # Sequence 목록

\df # Function 목록

\dv # View 목록

\du # User 목록

show data_directory; # 데이터 디렉터리 확인

 

< postgresql conf 위치 >

 

/var/lib/pgsql/13/data

중간에 13은 postgres 버전을 의미함

 

 

 

 

참고

airflow 공식 docker-compose https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml

https://progressivecoder.com/how-to-setup-an-airflow-postgres-connection/

 

curl, yum install 등의 명령어 실행시 아래처럼 SSL 이슈가 나오는 경우

 


[MIRROR] pgdg-redhat-repo-latest.noarch.rpm: Curl error (60): Peer certificate cannot be authenticated with given CA certificates for https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm [SSL certificate problem: self signed certificate in certificate chain]
[FAILED] pgdg-redhat-repo-latest.noarch.rpm: Curl error (60): Peer certificate cannot be authenticated with given CA certificates for https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm [SSL certificate problem: self signed certificate in certificate chain]
Curl error (60): Peer certificate cannot be authenticated with given CA certificates for https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm [SSL certificate problem: self signed certificate in certificate chain]


 

yum.conf 파일을 열고

sudo vi /etc/yum.conf 

 

아래를 추가

sslverify=false

 

출처

https://stackoverflow.com/a/52697992

 

 

docker 에서 곧바로 centos 7 이미지를 가져오면


docker pull centos:7

내부에서 문제가 좀 많음

문제가 없는 centos 7 은 아래 Dockerfile 을 만들어서 직접 build 해서 만듦

 

FROM centos:7
ENV container docker
RUN (cd /lib/systemd/system/sysinit.target.wants/; for i in *; do [ $i == \
systemd-tmpfiles-setup.service ] || rm -f $i; done); \
rm -f /lib/systemd/system/multi-user.target.wants/*;\
rm -f /etc/systemd/system/*.wants/*;\
rm -f /lib/systemd/system/local-fs.target.wants/*; \
rm -f /lib/systemd/system/sockets.target.wants/*udev*; \
rm -f /lib/systemd/system/sockets.target.wants/*initctl*; \
rm -f /lib/systemd/system/basic.target.wants/*;\
rm -f /lib/systemd/system/anaconda.target.wants/*;
VOLUME [ "/sys/fs/cgroup" ]
CMD ["/usr/sbin/init"]

 


< build 명령어 >

$ docker build -t mycentos:7 .

 


< run 명령어 >

$ docker run --name mycentos7 --privileged -d mycentos:7 init

 

Dockerfile 출처

https://hub.docker.com/_/centos

2.


airflow 의 장점
- 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG 로 만듦
- backfill 가능
- 원하는대로 스케줄링 가능

dag_id 는 DAG 를 나타내는 유니크한 값

각 오퍼레이터 (bashoperator, pythonoperator....) 는 하나의 태스크 수행하고
여러 오퍼레이터가 모여 DAG 를 구성

task 와 operator 차이점
operator 는 단일 작업을 수행하는 역할
taskoperator 의 상태를 관리하고 사용자에게 시작/완료 등의 상태 변경을 표시하는 Airflow 의 내장 컴포넌트


Airflow WebUI 화면에서
성공한 태스크는 초록색
실패한 태스크는 빨간색
앞 태스크의 성공을 기다리고 있는 태스크는 주황색으로 표시

실패한 태스크는 Clear 버튼으로 실패한 부분부터 다시 실행 가능


 

 

3.


종료 날짜가 없다면 DAG 의 스케줄링이 계속 진행됨

스케줄링(schedule_interval)은 "@daily" 처럼 넣거나 "0 2 * * *" 처럼 넣을 수 있음

또한 3일마다 (dt.timedelta(days=3)), 2시간마다(dt.timedelta(hours=2)) 처럼
crontab 에서 못 하는 간격 (빈도) 스케줄링을 Airflow 는 할 수 있음



증분 방식이란, 전체 데이터를 가져오기 보다는 내가 원하는 날짜의 데이터만 가져와서 처리하는 것
예를 들어 openAPI 를 그냥 사용하면 30일치 데이터를 들고오지만
정해진 날짜 파라미터에 값을 넣으면 해당 날짜 데이터만 가져옴
이렇게 매일 다른 날짜 처리를 하며 데이터를 쌓아나가는 것이 증분 방식

이를 위해 동적으로 바뀌는 날짜 값을 알아야 함
execution_date 가 바로 그것임

execution_date 는 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프임
헷갈리기 때문에 execution_date 는 deprecated 되고 대신 다른 값이 생김
실제로 실행하는 날짜 'data_interval_end' 가 됨


기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end
tomorrow_ds deprecate
tomorrow_ds_nodash deprecate
yesterday_ds deprecate
yesterday_ds_nodash deprecate
prev_execution_date prev_logical_date
next_execution_date next_logical_date

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/


백필은 catchup 이 true 일 때,
시작 시간부터 현재 시간까지의 모든 과거 스케줄 간격마다 모든 작업을 수행하는 기능

Airflow 태스크는 원자성과 멱등성을 갖어야 함
- 원자성 : task 가 성공적으로 수행하거나, 실패시 시스템에 영향이 없는 상태에서 실패해야 함
  예를 들어 중간 결과물을 만드는 태스크가 쓰는 도중에 실패할 경우, 뒤의 태스크가 결과물을 이용할 수 있는 상황이 발생
- 멱등성 : 여러번 실행해도 같은 결과를 도출해야 함
  예를 들어 결과값에 append 를 하는 태스크는 여러번 실행하면 결과값이 계속 늘어날 것임

 

 

4.


jinja 템플릿은 런타임 시에 사용 가능한 문자열 변수
다음과 같이 중괄호 두 개로 사용 가능

BashOperator(
  task_id="test",
  bash_command=(
    "echo {{ execution_date.year }}"
    "echo {{ '{:02}'.format(execution_date.month) }}"
  )
)

jinja 템플릿에 사용 가능한 변수들은 모두 태스크 콘텍스트에서 살펴볼 수 있음
operator 실행시 kwargs 를 통해 태스크 컨텍스트 변수들이 전달되어 사용 가능

op_kwargs는 dict
op_args는 list


DummyOperator 는 EmptyOperator 로 바뀌었음
아무 작업도 하지 않으며, 여러 다른 작업들을 그룹화(fan-out) 하는데 사용

mock 테스트 용도로 사용됨
테스트시 EmptyOperator 를 사용하고, 나중에 다른 Operator 로 변경하는 식

BranchPythonOperator 에서 일부 작업을 건너 뛰고 싶을 때 EmptyOperator 를 대신 사용
(빈 경로를 가질 수 없기 때문)

참고 https://stackoverflow.com/a/57486645



태스크간 작은 데이터를 전달할 때 xcom 을 사용 가능(메타스토어에 저장되고, 메타스토어마다 저장 가능 크기가 다름)
태스크간 큰 데이터를 전달할 때, 모든 태스크가 접근 가능한 스토리지 사용하여 전달

xcom 혹은 스토리지로 데이터 전달시,

dag 에 나타나지 않는 의존관계
를 만듦
예를 들어 A태스크에서 토큰을 받아 xcom 에 넣었고 B태스크에서 꺼내 쓰는 경우
A와 B 사이에 의존 관계가 형성됨

원자성을 무너뜨릴 수 있음
예를 들어 B태스크에서 사용하려고 토큰을 꺼내보니 기간만료라서 사용 불가한 경우

xcom 에 저장되는 데이터는 반드시 직렬화가 가능해야 함
예를 들어 파이썬 람다는 xcom 에 저장 불가

xcom 을 직접 사용하는 대신
taskflow API 를 사용하여 데이터를 넘길 수 있음
마치 python 함수 파라미터로 데이터를 받는 느낌


 

5.


fan-out 구조 의존성

task_start = EmptyOperator(task_id="start")
task_start >> [task_A, task_B]

fan-in 구조 의존성

[task_A, task_B] >> task_finish


코드 내용에 따라 의존성 결정하기 위해 BranchPythonOperator 사용

    def choose_branch(**context):
        if context["execution_date"] == THE_DATE:
            return "b1"
        if context["execution_date"] > THE_DATE:
            return "b2"
        if context["execution_date"] < THE_DATE:
            return "b3"

    branching = BranchPythonOperator(task_id='choose_branch', python_callable=choose_branch)
    b1 = BashOperator(task_id='b1', bash_command='echo b1')
    b2 = BashOperator(task_id='b2', bash_command='echo b2')
    b3 = BashOperator(task_id='b3', bash_command='echo b3')
    c1 = BashOperator(task_id='c1', bash_command='echo c1')

    start_dag >> branching >> [b1, b2, b3]
    b1 >> c1


참고 https://wefree.tistory.com/38



트리거 규칙은 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수 조건

all_success (default): All upstream tasks have succeeded
all_failed: All upstream tasks are in a failed or upstream_failed state
all_done: All upstream tasks are done with their execution
one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
always: No dependencies at all, run this task at any time

 

6.


센서는 특정 조건이 true 인지 지속적으로 확인하고
조건이 true 라면 지정된 작업을 수행

센서는 상태가 true 가 될 때까지 혹은 타임아웃이 될 때까지 계속 확인 진행
true가 되거나 타임아웃이 지나면 확인을 멈춤


filesensor 의 경우 wildcard 를 사용해 파일을 감시하는 것이 가능함

파일이 writing 되는 도중에 filesensor 가 감지하고 그 불완전한 파일을 사용할 수 있기 때문에
파일이 성공적으로 쓰여지면 파일 이름에 success 등의 태그를 붙임
filesensor 가 success 태그가 붙는 파일만 감지하도록 만듦
이 방법은 filesensor 의 오용을 피하는 방법도 되지만,
Operator 의 원자성을 지키기 위한 방법도 됨(불완전한 파일이 시스템에 영향을 주지 않도록)

filesensor 대신 pythonsensor 사용도 가능
python 코드를 이용해서 특정 path 에 특정 파일이 저장되었는지 확인하는 센서이며
조건이 true 라면 true 를 return 함



센서는 (기본) 7일을 넘기면 timeout 이 나면서 실패하게 됨

매일 센서가 스케줄링되면, 7일동안 7개의 센서가 실행되게 되므로
리소스를 잡아먹으며, 새롭게 실행되어야 할 태스크가 실행되지 않는(센서 데드락) 원인이 됨
따라서 센서를 실행할 때는 reschedule 모드를 설정
센서 reschedule 모드 poke 동작을 실행할 때만 태스크 슬롯을 차지하며,
대기 시간 동안은 슬롯을 차지하지 않음

태스크 실행 수 제한 참고 : https://eyeballs.tistory.com/534



TriggerDagRunOperator 를 사용하여 Dag1 의 태스크에서 Dag2 를 실행할 수 있음
트리거되는 Dag2 는 schedule_interval 이 필요하지 않아서 None 값을 갖음

백필에 의해 Dag1 이 동작하면 이전에 실행된 task 상태는 삭제되고 새로운 태스크가 실행됨
이에 따라 Dag2 가 트리거되어 실행되는데,
트리거되는 Dag2 에서는 이전에 실행된 태스크가 삭제되지 않고 새로운 태스크가 생성



dag 실행시 dag_run 에 run_id 값이 뜸
run_id 값을 보고 해당 dag 가 어떻게 실행되었는지 알 수 있음

schedule__ : dag가 스케줄되어 실행됨
backfill__ : dag 가 백필에 의해 실행됨
manual__ : dag 가 수동으로 실행됨



Airflow 는 DAG 간 의존성을 관리하는 방법을 제공하지 않음
예를 들어 dag1, dag2 가 모두 성공하면 dag3 가 트리거되어 실행되는 경우
[dag1,dag2] >> [dag3] 처럼 의존성을 관리하는 방법은 없음

다른 방법으로 의존성을 만들 수 있음
dag3 에서 ExternalTaskSensor 를 사용하면 됨
ExternalTaskSensor 는 dag1 과 dag2 의 태스크를 살펴보다가, 둘 다 성공하게 되면 dag3 의 태스크를 실행함

이 방법을 사용하기 위해서는
dag3 의 ExternalTaskSensor 실행 날짜와
dag1, dag2 의 태스크 실행 날짜가 완벽하게 동일해야 함
그렇지 않으면 ExternalTaskSensor 가 dag1,2 의 태스크를 찾지 못 함

실행 날짜가 다르다면, 다른 만큼 차이값을 ExternalTaskSensor 에 미리 넣어주어야 함
만약 dag1,2 태스크 실행 시간이 매일 10시이고
dag3 ExternalTaskSensor 실행 시간이 매일 12시 라면,
2시간 차이가 난다고 ExternalTaskSensor 에 미리 알려주어 실행 날짜를 맞춤




TriggerDagRunOperator 을 이용하지 않고,
다른 방법으로 dag 를 트리거 할 수 있음
바로 CLI 와 RestAPI
이 두 가지 방법으로 dag 를 트리거하면, 트리거 하는 순간에만 dag 가 실행되고
dag가 꾸준히 스케줄링되지는 않음

다음과 같은 CLI 명령으로 dag1 대그 실행 가능

airflow dags trigger dag1
airflow dags trigger --conf '{"age": 12, "sex": "M"}' dag1

RestAPI 도 사용 가능


 

 

 

9.


테스트에 대한 내용
다시 보자


 

 

11.

모범 사례
마지막에 볼 것

 

12.


Airflow 아키텍처는 최소 세 가지 요소를 갖추고 있음

- 웹서버 : Airflow 의 프로세스
  파이프라인 상태정보 시각화
  사용자가 dag 수행 및 관리
  메타스토어의 직렬화된 dags 를 읽고 사용

- 스케줄러 : Airflow 의 프로세스
  dags 가 있는 dir 에 접근 가능해야 함
  dags 를 읽고 구문 분석 후 메타데이터에 저장
  실행할 태스크 결정하고 대기열에 배치

- 메타스토어(데이터베이스) : 메타데이터 저장
  직렬화된 dags 를 저장하고 있음



4 가지 익스큐터 유형

- SequenctialExecutor
  분산 환경에서 사용 불가능
  테스트용

- LocalExecutor
  분산 환경에서 사용 불가능
  단일 호스트용

- CeleryExecutor
  분산 환경에서 사용 가능
  멀티 호스트용

-KubernetesExecutor
  분산 환경에서 사용 불가능
  쿠버네티스 기반 컨테이너용



LocalExecutor 는 단일 머신에서 태스크 병렬 실행이 가능
기본적으로 워커 프로세스는 최대 32개 병렬 실행 가능
즉, 단일 머신에서 여러 워커가 동작하며 병렬 실행

CeleryExecutor 는 내부적으로 Celery 를 이용하여 실행할 태스크들에 대해 대기열을 등록
이후, 워커가 대기열에 등록된 태스크를 읽고 개별적으로 처리
즉, 다중 머신에서 여러 워커가 동작하며 분산 병렬 실행
대기열 메커니즘을 위해 redis, rabbitmq 등의 인메모리 데이터베이스 사용

KubernetesExecutor 는 쿠버네티스에서 워크로드 실행
airflow dag 의 각 태스크는 독립된 쿠버네티스 파드에서 실행

[그림 설명 링크]



airflow 에서 일어나는 모든 일은 메타스토어에 저장됨
airflow 는 SQLAlchemy 를 이용하여 외부 Database 에 접근 및 태스크를 수행하여 메타스토어로 사용
메타스토어는 PostgreSQL 이나 MySQL 을 권장. SQLite 는 동시 쓰기가 지원되지 않음



스케줄러를 자세히 알아봄

스케줄러 역할은 다음과 같음
- dag 파일 구문 분석 후, 추출된 정보는 메타스토어에 저장
- 실행할 준비가 된 태스크를 결정하고 이를 대기 상태로 전환
- 대기 상태의 태스크 가져와서 실행(익스큐터에게 넘기나 봄)

스케줄러는 dag 디렉터리를 주기적(while loop)으로 확인하고 dags 파일 처리
dags 파일이 변경되지 않아도 주기적으로 확인
변경된 dags 이 있으면 구문분석 후 메타스토어에 저장

스케줄러가 dags 읽고 처리하는 데 CPU 가 소모됨
처리 주기를 줄이면 CPU 가 더 소모됨
처리 주기, 구문 분석에 사용될 프로세스 수 변경 등 옵션이 존재함

스케줄러는 실행할 태스크 인스턴스를 결정하는 역할도 함
각 태스크 인스턴스에 대해 모든 태스크가 종속성이 충족되는지,
모든 태스크가 정상적으로 마지막 단계까지 진행되었는지 확인 후
다음 순서로 실행되어야 하는 태스크들을 예약

실행이 예약된 태스크들은
해당 태스크에 쓸 수 있는 슬롯에 여유가 있을 때 (실행을 위해) 대기열에 넣어짐
태스크 인스턴스가 대기열에 배치되면 더 이상 스케줄러의 통제를 받지 않음
스케줄러의 역할은 실행 가능한 태스크 확인 후 예약, 그리고 대기열에 넣어주는 것 까지

태스크 익스큐터 프로세스가 그 다음 작업을 진행함
익스큐터(Local, Celery 등) 는 대기열에 태스크가 올 때 까지 기다림
익스큐터가
대기열에서 태스크 인스턴스 읽고, 워커에서 태스크 실행
익스큐터에서 태스크를 실행한다는 것은,
익스큐터에서 실행중인 태스크가 실패해도 airflow 자체에 영향이 가지 않도록 태스크를 실행할 새 프로세스를 만드는 것을 의미함

태스크의 상태는 (메타스토어에 저장되는) 하트비트로 알 수 있음



익스큐터는 스케줄러의 일부분
하나의 airflow 환경에서 사용 가능한 익스큐터 종류는 단 하나
태스크가 '실행 중' 상태로 바뀌었다는 말은, 익스큐터가 해당 태스크를 담당하여 실행하고 있다는 말




dag 의 task 실행이 실패하면
on_failure_callback 에 등록한 함수를 실행하도록 할 수 있음
여기 등록된 함수 내에서 slack 이나 email 로 메세지를 보내도록 하면 됨

성공시 콜백되는 함수를 등록하는 on_success_callback
태스크 재시도시 콜백되는 함수를 등록하는 on_retry_callback 도 존재함



 

 

 

 

 


DagRun 
- Task 인스턴스들을 DAG에 정의 된 특정 execution_date 에 실행하는 DAG의 인스턴스
  단순하게 표현하면 DAG의 실행 이력을 의미
  실행 중이거나 종료된 상태를 의미
- 원자성을 갖음 : 다른 리소스가 필요하지 않음
- 멱등성을 갖음 : 반복해도 동일한 결과를 보여줌

 

두번째, 세번째, 네번째 dagrun 이 실행하지 못하다가 5일날 모두 catchup 되었다면,

execution_date 는 원래 실행되었어야 했던 그 날의 execution_date 를 갖고

start_date 는 catchup 한 날짜(5일)이 됨

 

 

 

 

 

 

 

 

+ Recent posts