< scala map >

 

scala> val myList = List("abc", "bcd", "cde")
scala> val reverseList = myList.map((s:String) => {s.reverse})
scala> for(l <- reverseList){println(l)}

 

결과

 

cba
dcb
edc

 

 

 

 

< scala reduce >

 

scala> val numberList = List(1,2,3,4,5)

scala> val sum = numberList.reduce( (acc:Int, newNum:Int) => acc + newNum )

 

reduce 는 List 내 모든 원소에 대해 적용

reduce 에 들어가는 acc 는 acc+newNum 의 값이 되고

newNum 은 numberList 에서 새로 들어오는 값이 됨

 

차근차근 살펴보면 다음과 같음

acc : 1, newNum : 2. 결과 acc+newNum = 3 이것이 다시 acc 으로 들어감

acc : 3, newNum : 3. 결과 acc+newNum = 6 이것이 다시 acc 으로 들어감

acc : 6, newNum : 4. 결과 acc+newNum = 10 이것이 다시 acc 으로 들어감

acc : 10, newNum : 5. 결과 acc+newNum = 15

 

numberList 내의 모든 원소에 대해 전부 적용되었으니 마지막에 반환되는 값은 15

따라서 결과는 15

 

근데 아래처럼 String concat 으로 바꾸면 위의 논리와 맞지 않는 결과가 생김

scala> val ml = List("a","b","c")
scala> ml.reduce((x:String, y:String) => x+y)
결과 abc ????

 

reduce 를 어떻게 이해해야하지...??

 

추가) map 혹은 reduce 는 scala 자체가 갖고 있는 method 이기도 하지만

spark 가 갖고 있는 method 이기도 함

둘 다 동일하게 동작하지만 분산 처리 할 때는 spark 의 map, reduce 를 사용해야 함

 

 

 

< scala filter >

 

scala> val filterFives = List(1,2,5,3,4,5,6).filter((x:Int) => x!=5)

scala> val filterFives = List(1,2,5,3,4,5,6).filter( _!=5 )

 

filter 내부에는 boolean 값이 들어가나 봄

boolean 값이 true 인 원소만 남는 것 같음

 

또한 underline ( ' _ ' ) 을 사용하여 리스트 내의 모든 원소를 대신 표현 가능함

 

결과

List(1, 2, 3, 4, 6)

 

 

 

 

< scala List 연결(concat) >

 

++ 를 사용

 

scala> val concat = List(1,2,3)++List(4,5,6)

 

결과

List(1, 2, 3, 4, 5, 6)

 

 

 

 

< scala List 여러 함수 >

 

scala> List(1,2,3,4).reverse
결과 List(4, 3, 2, 1)

scala> List(3,2,4,1).sorted
결과 List(1, 2, 3, 4)

scala> List(3,2,2,3,2,1).distinct

결과 List(3, 2, 1)

 

scala> List(1,2,3).max
결과 Int = 3

scala> List(1,2,3).sum
결과 Int = 6

scala> List(1,2,3).contains(2)
결과 Boolean = true

 

 

 

 

 

 

< 자바에서의 hashMap >

 

스칼라의 Map 은 파이썬의 dict, 자바의 hashMap

 

scala> val myMap = Map("A" -> "a", "B" -> "b", "C" -> "c", "D" -> "d")
scala> myMap("C")

 

결과 String = c

 

scala> myMap.contains("E")
결과 Boolean = false

 

scala> util.Try(myMap("E")) getOrElse "Unknown"
결과 String = Unknown

 

 

 

 

 

< Spark Session builder >

 

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("EyeballsTest").master("local[*]").config("spark.some.config.option", "some-value").getOrCreate()

https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession
 

 

 

 

< csv 파일 읽기 >

 

import spark.implicits._

val myCsv = spark.read.option("header", "true").option("inferSchema", "true").csv("a/b/c.csv")

 

 

< tsv 파일 읽기 >

 

import spark.implicits._

val myCsv = spark.read.option("header", "true").option("sep","\t").option("inferSchema", "true").csv("a/b/t.tsv")

 

 

< txt 파일 읽기 >

 

txt 파일을 읽은 후 column 의 이름은 "value" 로 고정되어 사용됨

import spark.implicits._

val myTxt = spark.read.txt("a/b/d.txt")

myTxt.select("value").show(3)

 

 

 

 

 

 

< dataframe 에 sql 쿼리 적용 >

 

sql 쿼리 적용을 위해 dataframe 에 대한 tempview 를 만들어야 함

 

myCsv.createOrReplaceTempView("tempTable")

val fives = spark.sql("SELECT * FROM tempTable WHERE key==5")

 

 

 

 

< select >

 

people.select(people("name"), people("age") + 10).show(3)

 

 

 

 

< filter >

 

people.filter(people("age") < 21).show(3)

 

 

 

 

< sort >

 

people.sort("age").show(3)

people.sort($"age".desc).show(3)

 

 

 

 

< groupBy >

 

people.groupBy("age").avg("friends").show(3)

people.groupBy("age").agg(round(avg("friends"), 2)).show(3)

people.groupBy("age").agg(round(avg("friends"), 2).alias("rounds")).sort("age").show(3)

 

agg 함수를 통해 groupBy 이후 집계 처리가 가능

 

temps.groupBy("station").min("temperature").show()

 

groupBy 로 묶은 값 중 가장 작은 값(혹은 가장 큰 값)을 집계 가능

 

 

 

 

 

 

< show 로 전체 컬럼 출력하기 >

 

show 에 입력되는 파라미터를 column 의 개수로 주면 됨

예를 들어

people.show(people.count.toInt)

 

 

 

 

 

 

 

< explode and split >

 

split 을 통해 각 column 의 값을 정규식으로 분할

split 을 통해 분할된 각 값을 (explode 를 통해) 각 행(row) 으로 만들고 그 이름을 word 라고 칭함

 

people.select(

    explode(

        split("value", "\\W+")

    ).alias("word")

).filter("word" =!= "")

.show(3)

 

 

 

 

 

< udf 대상으로 broadcast 변수 사용하기 >

 

//broadcast 변수에 넣을 함수 선언

def loadMovieNames() : Map[Int, String] = {

...

}

 

//위에서 선언한 함수를 broadcat 변수에 넣음

val nameDict = spark.sparkContext.broadcast(loadMovieNames())

 

//익명함수 선언

val lookupName : Int => String = (movieID:Int) => {

    nameDict.value(movieID)

}

 

//위에서 선언한 익명함수를 udf 로 래핑

import org.apache.spark.sql.functions.udf

val lookupNameUDF = udf(lookupName)

 

//udf 의 결과값을 column 에 추가

import org.apache.spark.sql.functions.col

movieContents.withColumn("movieTitle", lookupNameUDF( col("movieID") ))

 

spark 의 각 executor 에 nameDict 를 배포했기 때문에

바로 위 withColumn 의 lookupNameUDF 를 실행할 때

각 executor 가 로컬로 갖고 있는 nameDict 함수를 그대로 사용 가능

 

 

 

< self join >

 

아래 코드에서 ratings 는 DataSet

as 를 사용하여 join 시 별명을 붙여 알아보기 쉽게 join 가능

 

val joinResult = ratings.as("ratings1")

.join( ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" )

.select(

  $"ratings1.movieId".alias("movie"),

  $"ratings2.userId",alias("user")

)

 

 

 

 

 

 

< SparkSession 닫기 >

 

spark.stop()

 

 

 

 

 

 

 

 

 

빅데이터를 지탱하는 기술 책을 정리한 내용

 

 

2.


트랜잭션처럼 사실이 꾸준히 기록되는 것을 팩트 테이블이라고 하고
팩트 테이블 정보를 구체적으로 나타낸 것디멘전 테이블이라고 함
계속 쌓여서 데이터를 차지하는 팩트 테이블에는 id 등의 값만 남겨두고
디멘전 테이블에 그 외 정보를 기록
그래서 팩트 테이블 크기를 최대한 줄임(String 값 등은 디멘전에 넣음)

데이터 마트는 스타 스키마로 구성하는 것이 좋음
이유는 다음과 같음
- 이해하기 쉬워서 데이터를 쉽게 분석 가능
- 구체적인 값은 디멘전 테이블을 둠으로써 팩트 테이블의 크기가 일파만파 늘어나는 것을 최소화(위에 적은 대로)
- 디멘전 테이블이 비정규화 되어 있어서 중복은 발생하지만 join 등의 성능은 좋음


열 지향 스토리지가 나옴에 따라, 데이터 마트를 스타 스키마가 아니라 단순히 하나의 큰 팩트테이블로 만드는 게 좋아짐
열 지향 스토리지는 칼럼 단위로 데이터가 저장되기 때문에, 칼럼 수가 늘어도 성능에 영향이 없고
컬럼 단위로 압축하기 때문에 디스크도 많이 잡아먹지 않음
비정규화 되어있기 때문에 join 등의 성능이 좋음

따라서 최근 데이터 마트는 비정규화 테이블(하나의 큰 팩트 테이블)을 사용
데이터 웨어하우스에 데이터를 축적할 때 스타 스키마는 아직 좋은 선택지임

열 지향 스토리지는 데이터들이 연속된 곳에 배치되기 때문에 읽기가 빠른데,
만약 쓰기를 여러번 하게 되면, 데이터들이 잘게 분산되어 연속되지 못한 곳에 쓰여지기 때문에
성능이 좋아지지 않음


 

 

3.


데이터 레이크 등에 저장된 비정형, 반정형 raw 데이터는 SQL 로 집계할 수 없기 때문에
이 raw 데이터를 정형 데이터로 변환하는 ETL 작업이 선행되어야 함

일반적으로 raw 데이터를 정형 데이터로 변환한 후
압축률과 쿼리 성능을 높이기 위해 열 지향 스토리지(Orc on Hadoop 등)에 저장
그리고 나면 SQL 로 효율적으로 집계 가능하게 됨



데이터 마트 팩트 테이블에 데이터를 넣을 때는
기존 데이터에 새로운 데이터를 추가(insert into)하거나
파티셔닝 된 부분을 덮어쓰는 방법(insert overwrite)이 있음

전자의 방법은 효율적일지 모르나
추가가 실패하여 데이터 누락이 발생할 수 있고
여러번 추가하여 데이터 중복이 발생할 수 있고
팩트 테이블 자체를 손 봐야 할 때 관리가 복잡해지는 이슈가 생김

따라서 데이터 마트 팩트 테이블에 데이터를 추가할 때(날짜, 시간 등으로)파티셔닝하고
파티셔닝을 overwrite 하는 방법으로 추가
이러면 데이터를 중복하여 추가하는 이슈에서 벗어날 수 있음

통계 처리를 위해 미리 중간 결과물을 만들어두면 효율적임
예를 들어 1년 동안의 uv 가 필요하다면
매 달마다 id 값을 distinct 해둔 결과물을 기록해두었다가
나중에 열 두달 짜리 distinct 결과물을 한 번에 처리하도록 함


 

 

4.


시계열 데이터란, 시간과 함께 생성되는 데이터

수집단에서 (SQL 혹은 API 를 통해) 데이터를 벌크형으로 꺼내와 DW 에 넣는 경우,
데이터를 꺼내오고 필터링 혹은 정형 데이터로 만드는 ETL 서버를 하나 둠
데이터 꺼내오는 작업은 1시간에 한 번 처럼 주기적으로 동작시키고
꺼내온 raw 데이터를 정형 데이터로 만들어서 DW 에 넣어주면 나중에 Hive 등으로 처리하기 쉬움

수집단에서 스트리밍으로 데이터를 만들고 보내주는 경우,
데이터를 일단 받아야하기 때문에 NoSQL 혹은 Kafka 등의 메세지 큐에 넣어둠
메세지 큐 없이 데이터를 곧바로 DW 에 저장하게 되면
부하 제어가 어려워 성능 한계에 도달하기 쉬움
메세지 브로커(kafka) 데이터 쓰기 속도를 조절하기 위한 완충 버퍼

메세지 브로커는 송신단과 수신단의 인터페이스 결합을 줄여주기도 함
또한 과거 데이터를 다시 처리할 수 있도록 도와줌



메세지 브로커에 담기는 정보들은 모두 at least once 로 전송된 것이라
중복이 있을 수 있으며, 이 중복은 사용자가 처리해야 함

중복이 발생할 때는 각 메세지 데이터에 시퀀스 번호나 offset 등의 추가 값을 붙여
중복된 데이터가 저장될 때 덮어쓸 수 있도록

메세지 데이터에 고유한 UUID 추가 값을 붙여
중복된 데이터가 저장될 때 '이미 들어온 값이네?' 하고 버려버릴 수 있도록 할 수 있음
이 경우 모든 UUID 를 기록해두는 것은 불가능하므로 최근 1시간 동안의 UUID set 을 기억해두는 방식으로 필터링

혹은 중복 데이터를 DL 에 쌓아두고, 나중에 배치 처리할 때 중복 데이터를 없애는 방식으로 진행할 수 있음



이벤트 시간 : 실제로 메세지가 생성된 시간
프로세스 시간 : 메세지를 처리한 시간
수집 시간 : 메세지를 받은 시간

네트워크 이슈로 인해 과거 이벤트 시간을 갖는 데이터가 지금 날아오는 경우도 있어서 이 부분을 처리해야 함

이벤트 시간이 포함된 데이터를 받아서 처리할 때는, Cassandra 처럼 시계열 인덱스를 지원하는 DB 에 넣고
이벤트 시간으로 인덱싱하여 정렬해 사용하는 것이 좋음
이 때 DB 를 열 지향 스토리지로 사용하여 쿼리 성능을 높임

혹은, 일단 프로세스 시간으로 파티셔닝하여 데이터를 저장한 후에
나중에 데이터 마트에 들어가도록 처리 할 때 이벤트 시간에 의한 정렬을 함께 하도록 함
파일이 조각나지도 않고 항상 최적의 데이터 마트를 유지 가능



HDFS 대신 NoSQL 을 사용해야 할 때는 다음과 같음
- 저장한 데이터를 업데이트 해야 할 때
- 데이터를 넣은 후 곧바로 집계 작업을 해야 할 때


 

 

5.


< 배치형 데이터 플로우 >

airflow 등을 트리거로 이용하여, raw 데이터를 HDFS 등의 데이터 레이크로 운반하는 스크립트 실행
이후 Spark 등을 이용하여 DL 내 raw 데이터를 원하는 대로 가공 후 DW 에 넣음
(비정형, 반정형 데이터를 정형 데이터로 가공하여 Hive 에서 사용할 수 있도록 한다던가)

DW 에서 DM 로 보낼 때는, (spark 등을 통해) 취급하기 쉬운 csv 등의 파일로 변환한 뒤 HDFS 에 저장하고
airflow 등을 트리거로 이용하여, HDFS 로부터 DM 로 csv 를 운반하는 스크립트 실행

요지는, spark 에서 데이터 source 와 데이터 destination 을 직접 연결하지 않고
데이터를 꼭 HDFS 등에 쓴 후에 spark 를 실행한다는 것
spark 를 직접 데이터 source 로부터 읽거나 데이터 dest 에 쓰면
읽기, 쓰기 작업이 오래 걸릴 수 있고,
네트워크가 끊기거나 하면 읽은 데이터를 잃어버리거나 쓰기 작업을 다시 해야 할 수 있기 때문
안정된 HDFS 에 데이터를 쓰는 것

주로 1년 이상의 장기적인 데이터 분석에 사용됨
데이터를 쌓아두기 때문에 재집계 가능


< 스트리밍형 데이터 플로우 >

배치형 데이터 플로우의 단점은 데이터가 쌓인 후에야 집계가 가능하다는 것과
배치 사이클이 오기 전에는 집계 결과를 볼 수 없다는 것
이벤트 발생에서 몇 초 후 집계 결과를 알기 위해서는 스트리밍형 데이터 플로우를 사용해야 함
예를 들어, 시스템 모니터링 등

스트림 처리는 HDFS 등의 분산 스토리지에 데이터를 저장하지 않고
받은 즉시 곧바로 처리해서 시계열 DB 등에 넣게 됨

과거 데이터를 다시 재활용하여 재집계하는 것은 부적합

배치형 데이터 플로우 진행시 데이터가 너무 많아서 의미있는 데이터만 배치형에 넣고 싶다면
HDFS 에 모든 데이터를 넣기 전에
스트림 처리를 통해 넣을 데이터를 선별한 후 나온 결과를 HDFS 에 넣는 것도 가능
예를 들어 DB -> Kafka -> 스트림 처리로 양을 줄임 -> Kafka -> HDFS (이후 spark 로 처리)



스트리밍 처리로 나오는 결과는 정확한 결과가 아닐 수 있음
가령 네트워크 이슈로 과거의 데이터가 지금 도착한 경우

하지만 실시간으로 데이터를 집계해야 할 필요가 있는 상황에서는 람다 아키텍처를 사용
실시간으로 데이터 확인이 필요하기 때문에 (결과가 부정확하지만) 데이터 스트림 처리를 하는 부분을 만들고
똑같은 데이터를 배치 데이터 플로우로 처리하여 나중에 일, 월 등의 긴 주기를 갖고 배치처리 한 결과를 정확한 결과로 보는 것
배치 처리 결과는 서빙레이어라는 부분에 모여서 사용자에게 서빙됨

배치 결과가 나오기 전까지, 스트리밍으로 처리된 결과는 참고용으로 볼 수 있음

람다 아키텍처는 같은 로직을 두 번 사용(스트림에서 한 번, 배치에서 또 한 번)한다는 것임
람다를 대체하기 위해 카파 아키텍처를 선택할 수 있음
카파는 배치와 서빙 레이어를 완전히 없애고 스트리밍 부분만 남긴 것
kafka 에 데이터 쌓아두는 기간을 늘림
과거 데이터를 재처리하고 싶다면, kafka 로부터 다시 읽음
카파의 단점부하가 급격하게 높아질 수 있다는 것
예를 들어 재처리를 위해 과거 한달 데이터를 한 번에 보내면 스트리밍 처리에 부하가 높아질 수 있음



왜 프로세스 시간이 아니라 이벤트 시간으로 처리해야 할까?
만약 스트림 처리가 모종의 이유로 인해 잠시 멈추었다고 가정함
재실행하면 그 동안 kafka 에 쌓여있던 데이터들이 한 번에 스트림에 들어오게 되는데
이 때 프로세스 시간이 정해지게 되고 
이를 대시보드 등으로 시각화 했을 때 흘러드는 데이터 양에 큰 변화가 있는 것처럼 보이는 이슈가 생길 수 있음
즉, 프로세스 시간으로 처리하면 스트림에 지연이 발생 할 때마다 스트림 처리 결과가 요동치게 됨
원래 데이터 모습을 알기 위해서는 이벤트 시간을 중심으로 실시간 처리해야 함
또한, 네트워크 이슈로 인해 과거 이벤트 시간을 갖는 메세지가 현재 도착한 상황에서
프로세스 시간을 기준으로 처리하면, 결과가 의도한대로 나오지 않을 수 있음

현재 도착한 데이터의 이벤트 시간이 과거라면, 이를 처리하기 위해 어떻게 해야 할까?
윈도우라는 개념을 도입하면 됨
윈도우 내에 도착하는 데이터들을 이벤트 시간 기준으로 정렬한 뒤 처리
만약 윈도우를 벗어난 데이터는 무시할 필요가 있다고 함
추가적으로 워터마크를 이용할 수 있음
워터마크는 (윈도우가 끝나고 나서도) 사용자가 지정한 기간 내에 딜레된 메세지를 처리할 수 있게 만들어 줌

참고 https://eyeballs.tistory.com/83
https://seamless.tistory.com/99

 

 

 

확인 할 사항

- 어떤 데이터인지 확인

  - 작은 손실이나 중복이 허용되는 데이터인지 아닌지

  - 데이터 용량은?

  - 데이터 포맷은?

- 실시간인지 배치인지

- 데이터를 수집하는 쪽에서 어떻게 보내주는지

  - 어떤 서버에서 보내주는지

  - 보내주는 서버는 몇 개인지

  - 보내주는 서버 종류가 다양한지

- 데이터를 사용하는 목적은 무엇인지

  - BI 인사이트 추출인지 아니면 학습용 데이터인지 등

- OLAP 가 많은지 OLTP 가 많은지

  - read 가 많은 작업인지 write 혹은 update 가 많은 작업인지

- 사용하는 서버 환경

  - 메모리, cpu, disk 등의 자원은 얼마나 되는지

  - 분산 환경인지

- 모니터링(premetheus, grafana)

 

 

전체 데이터 파이프라인


- raw 데이터를 데이터 레이크(HDFS)로 전송
- 가공(Spark, Hive 등) 하여 정형 데이터로
- HDFS 에 Orc, Parquet 등으로 파티셔닝하여 쓰고 Hive 로 쿼리

 

둘 다 columnar data format 인데

성능적인 면에서 차이가 남

 

 

파일 Write 성능 : (Parquet 보다) Orc 가 더 나은 성능

파일 Read 성능  : (Orc 보다) Parquet 이 더 나은 성능
출처: https://brocess.tistory.com/14

 

 

 

parquet 과 orc, json 등의 파일 포맷 간 성능 비교 자료

parquet 과 orc 각 컬럼 스키마에 따라 사이즈가 다르다는 게 눈에 들어옴

https://www.slideshare.net/HadoopSummit/file-format-benchmark-avro-json-orc-parquet

 

 

 

 

 

make 와 git , python 은 기본적으로 설치되어 있어야 함

(python 버전은 최신 버전이 좋음)

 

 

1. dependencies 를 설치

 

https://github.com/pyenv/pyenv/wiki#suggested-build-environment

 

 

  • Mac OS X:
    brew install openssl readline sqlite3 xz zlib tcl-tk
    If you haven't done so, install Xcode Command Line Tools (xcode-select --install) and Homebrew. Then:
  • Ubuntu/Debian/Mint:
  • sudo apt-get update; sudo apt-get install make build-essential libssl-dev zlib1g-dev \
    libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm \
    libncursesw5-dev xz-utils tk-dev libxml2-dev libxmlsec1-dev libffi-dev liblzma-dev
     
  • CentOS/Fedora 21 and below:
  • yum install gcc zlib-devel bzip2 bzip2-devel readline-devel sqlite sqlite-devel openssl-devel tk-devel libffi-devel xz-devel

 

 

 

 

 

 

 

2. 아래 명령어 실행


curl https://pyenv.run | bash
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init -)"' >> ~/.bashrc
echo 'eval "$(pyenv virtualenv-init -)"' >> ~/.bashrc
source ~/.bashrc

 

 

 

3. pyenv 로 설치할 list 확인


pyenv install --list

 

 

 

 

4. 아래 명령어로 설치/삭제


pyenv install 3.7.4
pyenv uninstall 3.7.4

 

 

 

 

5. pyenv 명령어들

 

pyenv 로 설치한 파이썬 버전들 확인


pyenv versions

 

가상 환경 생성 및 실행


pyenv virtualenv [python version] [environment name]
pyenv activate [environment name]
pyenv deactivate

 

 

 

참고

 

https://codingpackets.com/blog/pyenv-install-centos-8/

https://new93helloworld.tistory.com/326

https://flowerinmyheart.tistory.com/entry/CentOS-pyenv-%EC%84%A4%EC%B9%98

https://blog.visiodeibc.dev/pyenv-%EC%82%BD%EC%A7%88%EA%B8%B0-%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EB%B2%84%EC%A0%84%EA%B4%80%EB%A6%AC%EC%99%80-%EA%B0%80%EC%83%81%ED%99%98%EA%B2%BD-%EA%B5%AC%EC%B6%95%ED%95%98%EB%8A%94-%EB%B2%95-641c451512e6

 

 

airflow 의 DAG-level Permission 을 사용하면 다음과 같은 접근 제어가 가능함

- A 사용자는 A 사용자만을 위한 DAG 만 볼 수 있음
- A 사용자는 B 사용자을 위한 DAG 는 볼 수 없음

- A 사용자가 B 사용자의 DAG 를 보려면 권한(Role) 을 받아야 함

 

어떤 계정으로 접근하면 해당 계정에서만 보이는 dag 가 존재하고
그 dag 는 다른 계정에서는 보이지 않음

 

 

 

 

 

아래부터 DAG-level Permission 설정 방법에 대해 설명함

user1, user2 계정을 새롭게 만들고,

user1 계정으로 로그인해야만 보이는 dag 와

user2 계정으로 로그인해야만 보이는 dag 를 만들어 볼 예정

 

 

아래 절차대로 작업 진행
1. airflow Role 생성
2. airflow 계정 생성
3. 각 계정을 위한 dag 생성

 

 

 

 

 

 

< airflow Role 생성 >

 

admin 계정으로 접속한 후, Security - List Roles 클릭

 

Admin 을 클릭한 상태에서 Actions - Copy Role 클릭하여 Admin Role 을 복사

 

새롭게 복사한 Role 의 Edit record 버튼 누름

 

이름은 user1 로 명명

또한 Permissions 중에 'can read on DAGs' 와 'can edit on DAGs' 삭제 후 저장

 

여기까지 user1 을 위한 Role 을 하나 생성함
위와 같은 방식으로 user2 를 위한 Role 도 추가로 만듦

 

 

 

 

 

 

< airflow 계정 생성 >

 

아래 airflow 명령어로 user1, user2 airflow 계정 생성

 

airflow users create --username user1 --password user1 --role user1 --email user1@example.com --firstname user1 --lastname user1
airflow users create --username user2 --password user2 --role user2 --email user2@example.com --firstname user2 --lastname user2

Security - List Users 에서 잘 생성된 것을 확인

 

 

 

 

 

< 각 계정을 위한 dag 생성 >

 

user1 계정으로 로그인했을 때만 보이는 dag 는 아래와 같이 작성할 수 있음

 

only_for_user1.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "user1",
    "email_on_failure": "user1@example.com",
    "email_on_retry": "user1@example.com",
    "email": "user1@example.com",
    "retries": 3,
    "retry_delay": timedelta(minutes=1)
}

def _test():
    print("hi user1!")

with DAG(dag_id="only_for_user1",
    start_date=datetime(2022, 6, 25),
    schedule_interval="* * * * *",
    default_args=default_args,
    catchup=False,
    access_control={'user1':{'can_read', 'can_edit'}}
    ) as dag:

    test = PythonOperator(
        task_id="test",
        python_callable=_test,
    )

 

위와 같이 dag 를 작성한 후 genie_dev Role 을 살펴보면
다음과 같이 새로운 Role 이 추가된 것을 볼 수 있음

 

 

DAG 중간에 있는 access_control 을 해석해보면,


access_control={'[Role 이름]' : { '[권한]', '[권한]',.... }}


access_control={'user1':{'can_read', 'can_edit'}}


 

user1 이라는 Role 를 갖어야

dag_id 가 'only_for_user1'인 dag 를 볼 수 있다는 의미가 됨

 

user1 라는 Role 을  갖는 user1 계정으로 접근하면, only_for_user1 dag 가 보이게 됨

user2 라는 Role 을 갖는 user2 계정으로 접근하면, only_for_user1 dag 는 보이지 않음

 

실제로 airflow 화면은 아래와 같이 보여짐

 

 

 

user1 로 로그인 했을 때 보이는 airflow 화면

 

 

user2 로 로그인 했을 때 보이는 airflow 화면

 

 

 

 

 

같은 방식으로 user2 에서 보이는 dag 는 다음과 같이 작성 가능

 

only_for_user2.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "user2",
    "email_on_failure": "user2@example.com",
    "email_on_retry": "user2@example.com",
    "email": "user2@example.com",
    "retries": 3,
    "retry_delay": timedelta(minutes=1)
}

with DAG(dag_id="only_for_user2",
    start_date=datetime(2022, 6, 25),
    schedule_interval="* * * * *",
    default_args=default_args,
    catchup=False,
    access_control={'user2':{'can_read', 'can_edit'}}
    ) as dag:

    test = BashOperator(
            task_id='test',
            bash_command='echo "hello user2!"',
    )

 

 

 

 

 

user1 로 로그인 했을 때 보이는 airflow 화면

 

 

user2 로 로그인 했을 때 보이는 airflow 화면

 

 

 

 

 

only_for_user1.py 의 access_control 내용을 아래처럼 바꾸면

 


access_control={'user1':{'can_read', 'can_edit'}, 'user2':{'can_read', 'can_edit'}}

 

only_for_user1 dag 는 user1 혹은 user2 라는 Role 을 갖는 계정에서 보이게 됨

따라서 user2 Role 을 갖고 있는 user2 계정도 only_for_user1.py dag 를 볼 수 있음

 

 

 

access_control 을 수정 후, user2 로 로그인 했을 때 보이는 airflow 화면

 

 

 

 

 

 

 

 

참고

 

https://airflow.apache.org/docs/apache-airflow/stable/security/access-control.html 
https://swalloow.github.io/airflow-multi-tenent-1/ 
https://cloud.google.com/composer/docs/airflow-rbac?hl=ko

 

 

+ Recent posts