Dynamic Partition Pruning in Apache Spark


동영상(영어) : https://databricks.com/session_eu19/dynamic-partition-pruning-in-apache-spark

해설(영어) : https://blog.knoldus.com/dynamic-partition-pruning-in-spark-3-0/

슬라이드(영어) : https://www.slideshare.net/databricks/dynamic-partition-pruning-in-apache-spark

 

 

해당 포스트는 위 링크의 내용을 (모자란 실력이나마) 한글로 풀어써 본 것.
모자란 부분이나 잘못된 내용이 있으면 덧글로 알려주세용.

 

이탤릭 처리 되어있는 것은 필자의 주관이므로 그냥 넘겨도 무방.

 

 

 

 

Dynamic Partition Pruning 의 핵심은,

(full scan 을 피하기 위해) 내가 필요한 데이터만 골라서(pruning 해서 : 가지치기 해서) 적용(읽기)하는 것.

여기서 pruning 의 의미는, 내가 필요로 하지 않는 데이터를 읽지 않도록 피하는 것(가지치기 하는 것)

 

star schema queries 을 위해 중요하다고 함(가장 아래 설명 참고)

 

 

쿼리를 최적화를 하는 여러가지 방법(column pruning, constant folding, filter push down) 이 있는데,

Dynamic Partition Pruning 은 filter push down 을 사용한 최적화 기법이다.

 

 

구체적으로 filter push down 이 뭔지 살펴보자. 

 


Select * from Students where subject = ‘English’;

 

위와 같은 쿼리를 처리할 때 내부적으로 "데이터베이스->full scan-> filter" 하는 절차를 거친다.

 

필터 푸시 다운(filter push down) 이란, 필터를 scan 보다 먼저 둬서, 미리 필터링한 값을 scan 하는 방법이다.

full scan 을 피하고 원하는 값만 읽기 때문에 성능이 향상된다.

 

 

 

 

 

다음으로 partition pruning 이 뭔지 알아보자.

 

 

partition pruning 이란, 위에서 설명한 filter push down 기법을 partitioned files 에 적용한 것이다.

스파크 내 데이터가 partition 으로 분할된 것을 위의 그림에서는 색깔로 표시했다.

partitioned files 에 필터를 우선 적용하여, 원하는 partitioned files 만 scan 하는 것이 partition pruning 이다.

 

 

 

 

실제 작업을 할 때는, 하나의 partitioned file 에 하나의 single query 를 날리는 일 보다는

여러 partitioned files( tables ) 를 동시에 작업해야 하는 경우가 많다.

join 등의 operation 은 두 가지 테이블을 모두 사용하기 때문에

위에서 설명한 filter push down 만을 사용하여 성능 최적화를 하기는 어렵다.

 

예를 들어, 아래 쿼리를 실행한다고 하자.

 


Select * from Students join DailyRoutine
where DailyRoutine.subject = ‘English’;

 

DailyRoutine 이라는 테이블에서 subject 가 English 인 것만 뽑고,

Students 와 join 하는 쿼리인데, 이 때 Students 가 훨씬 큰 테이블이고 DailyRoutine 가 작은 테이블이다.

 

작은 테이블에 filter push down 을 적용해봤자, 큰 테이블의 레코드가 더 많기 때문에 성능적으로 큰 이득을 보기 어렵다.

 

 

데이터 엔지니어들은 대부분 작은 테이블과 큰 테이블을 모조리 스캔한 뒤 필터를 적용한 후 join하는 방식으로 두 개의 테이블을 join 한다.

악랄한 성능을 보일 것이 자명함

 

위 방법으로 join 을 하게 되면, full scan 으로 인한 오버헤드,

그리고 join 이후에 나온 중간 dataset 이 갖는 데이터 이중화(원본 데이터와 중간 데이터),

그리고 중간 데이터의 크기가 엄청 크기 때문에 관리하기 힘든 점 등에 의해

좋은 성능을 내기가 어렵다.

 

 

 

 

 

그래서 위와 같은 단점들을 해결하기 위해 (위의 그림처럼) Dynamic Pruning 을 활용하여 optimization 할 수 있다.

Dynamic Pruning 이란 dimension table(작은 테이블)의 필터링 결과를 fact table(큰 테이블) 에 적용하여 큰 테이블에서 full scan 을 하지 않고 필요한 데이터만 필터링하여 읽도록 하는 방법이다.

 

 

 

 

 

 

스파크에서 쿼리가 실행되면,

- 논리적 실행 계획 (Rule-based transformations 최적화)

- 물리적 실행 계획 (Stats-based cost model)

- RDD 를 이용하여 실제 분산 클러스터에서 작업 실행

을 거치게 된다.

 

동영상에서는

논리적 실행 계획 단계에서 (간단한 접근방법을 이용하여) Dynamic Pruning 이 어떻게 구현되는지 설명하고,

물리적 실행 계획 단계에서 이를 더 최적화 할 수 있는 방법을 설명한다.

 

 

 

 

< 논리적 실행 계획 단계 >

 

스파크의 분할된(파티셔닝 된) 데이터셋(큰 크기의 데이터셋)과 분할되지 않은 데이터셋(작은 크기의 데이터셋) 을 가정해보자. 왼쪽이 분할된 데이터셋인데 분할된 것을 색깔로 표현했다. 오른쪽은 분할되지 않은 데이터셋이라고 한다.

 

 

Non-partitioned dataset 에서 가져오는 데이터는 파란데이터와 초록데이터이고, 이 두 데이터를 Partitioned files 와 join 하고 싶다.

 

 

아무런 정책을 적용하지 않으면, partitioned files 는 full scan 을 하게 될 것이고 비용이 많이 소모가 될 것이다.

( 위 그림에서 FACT TABLE은 Partitioned files 를 담고 있는 테이블(큰 사이즈의 데이터를 갖는 테이블)을 의미하며, DIM TABLE 은 dimension table 의 약자로, non-partitioned dataset 을 담고 있는 테이블(작은 사이즈의 데이터를 갖는 테이블)을 의미함 )

 

 

위의 그림처럼, full scan 을 피하고 원하는 데이터(파란색, 초록색 데이터)만 콕콕 찝은 상태에서 join 을 하는 것이 최적의 성능을 낼 것 같다. (해당 문제 해결의 핵심적인 접근법)

그렇게 하려면 어떻게 해야 할까?

 

 

필터를 Partitioned files 에도 적용하는 것이다.

그럼 아래 그림처럼 될 것이다.

 

그림판 ㅈㅅ

 

빨간 점선 박스가 "데이터셋에서 파란색과 초록색만 뽑아달라"는 필터이다.

(Scan Table 이 먼저 오고 그 다음 Filter 가 적용된 것으로 보아 filter push down 은 적용되지 않은 그림인 듯)

빨간 점선 박스처럼 필터를 적용한 sub query 를 먼저 partitioned files에 적용한 후,

그 결과와, (오른쪽의) non partitioned dataset 의 필터 결과를 조인한다.

 

이 접근법의 단점은, 같은 쿼리(빨간 박스)가 중복되어 사용된다는 점.

따라서 이 쿼리 중복을 없애고 좀 더 효율적인 방법을 찾아본다.

이를 위해 물리적 실행 계획에서 위에 join 이 어떻게 동작하는지 알아보고 거기서 힌트를 얻은 방법을 적용해본다.

 

(내 생각. sub query 중복 문제가 발생했다는 것의 구체적인 의미가 궁금함.

같은 테이블을 대상으로 쿼리를 하는 행동을 두 번한다는 의미인지,      -> 이게 맞다.

똑같은 쿼리가 서로 다른 테이블에서 두 번 실행된다는 의미인지 모르겠음.

상식적으로 후자가 문제가 될 것 같진 않고, 문제가 된다면 전자일텐데

전자라는 전제하에 문제를 다시 파악해보자.

 

저 위에 빨간 점선박스 sub query 를 보면 dim table 에 대해 scan 하고 필터링 하는 두 가지 작업을 담고 있다.

이것을 fact table 위에서 똑같이 하는 것이 문제라는 말인데,

그럼 왜 미리 dim table을 필터링 한 결과값을 fact table에 필터로 적용하지 않고

다시 dim table 을 scan 하고 filter 를 적용하는 두 가지 작업을 하는걸까?      -> IN 을 사용하기 때문. 아래서 설명.

왜 그런지 보기 위해선 spark 가 내부적으로 쿼리를 어떻게 처리하는 지 알아야 할 것 같다.)

 

필자가 궁금한 사항에 대한 답변은 가장 아랫쪽 추가 설명으로 해결할 수 있겠다.

 

 

 

 

 

< 물리적 실행 계획 단계 >

 

 

먼저 broadcast hash join 의 동작 방식을 설명한다. 여기에서 최적화의 힌트를 얻을 수 있기 때문이다.

 

 

broadcast hash join 은 non-partitioned dataset(dimension table) 의 크기가 충분히 작으면 사용할 수 있는 효과적인 조인 방법이다(참고 https://eyeballs.tistory.com/247)

이 조인을 하기 위해 dimension table 의 값들을 hash 테이블 값(동영상에서는 이것을 build relation 이라고 부름)으로 변환하고,

build relation 값을 브로드캐스트 변수에 넣은 후, 각 spark workers들(위의 그림에 보이는 파란색,빨간색, 파란색) 에 뿌려준다.

그럼 각 worker 내부에서 받은 값과 partition 들을 locally join 한다.(그림에선 화살표로 나타냈지만 실제로는 각 worker 내부에서 지역적으로 join 이 일어남)

이렇게 함으로써 shuffle 과정을 생략하여 비용을 줄일 수 있다고 한다.

 

이렇게 broadcast hash join 이 진행되는데, 이러한 과정을 힌트 삼아 최적화에 적용/활용할 수 있다고 함.

위에서 언급한 sub query(빨간 박스) 중복 문제점을 극복하는 데 활용해보자.

 

 

 

위의 그림과 같이, build relation(그림상의 broadcast excahnge. filter push down 이 적용된 후 생성한 hash 값) 값을

partitioned dataset 에서 full scan 을 하기 전 단계에 'dynamic filter' 로써 넣어주면,

filter 를 거친 결과값(파란색, 초록색) 이 scan 된다.

그 후 join 을 진행하면 된다고 함.

이것이 바로 최적화하고 효과를 끌어올리기 위한 Dynamic Partition Pruning 의 핵심이라고 한다.

 

dimension table 에서 뽑은 hash table(build relation) 값을 fact table 에 필터로 적용시킴으로써

위에 논리적 실행 계획 단계에서 최적화 중에 발생한 sub query 중복 문제를 해결했다고 한다.

 

위에서는 브로드캐스트 해시 조인을 예로 들어 설명했지만, 아이디어만 여기서 가져온 것일 뿐

딱히 브로드캐스트 해시 조인이 아니더라도 Dynamic Partition Pruning 이 적용 가능한 것 같다.

아래 추가 설명 부분 참고.

 

 

 

 

 

 

 

< 추가 설명 >

 

논리적 실행 계획 단계 설명에서 필자가 빨간 점선박스에 대해 궁금해 했던 것 처럼,

빨간 점선박스가 당최 무엇인지 궁금할 수 있다.

그래서 다른 예제를 찾아 가져와봤다. 

(youtu.be/g-qZslQsOuE?t=813,

https://www.slideshare.net/databricks/whats-new-in-the-upcoming-apache-spark-30)

실제 쿼리까지 포함되어 있기 때문에 이해하기 더 쉽다.

 

 

 

위의 그림에서 t1 이 fact table, t2 가 dimension table 이다.

초록색 박스에 노란 Optimize 화살표가 붙어있다.

이 부분이 filter push down 을 적용할 수 있는 부분이다.

 

 

filter push down 을 적용하여 오른쪽처럼 Filter+Scan 이 되었다.

fact table 의 full scan 을 피하기 위해 t1.pkey IN 쿼리를 사용하였다.

해당 Filter 에는 t2 의 필터 쿼리가 들어가있다.

이것이 바로 위의 빨간 점선박스에서 나타나는 sub query 중복 문제인 듯 하다.

select t2.pKey 쿼리는 이미 왼쪽의 dim table 에서 사용되었는데,

오른쪽에서 dim table(t2) 을 기준으로 다시 한 번 사용되고 있다.

 

(내 생각. 그럼 왼쪽의 filter push down 결과를 재사용하면 되지 않을까?

이것을 하지 않는 이유가 있다면 두 가지 정도 생각해볼 수 있겠다.

첫째로 filter push down 결과를 재사용할 수 없어서

둘째로 재사용하는 것이 성능이 좋지 않기 때문에(병렬 처리 등의 이유로)

 

내가 궁금했던, 이 '재사용'한다는 부분이 바로 DPP 의 핵심임.

위에서도 설명했듯이, dim table filter 결과 값을 hash 로 만든 값을

fact table 에 filter push down 으로 적용!)

 

 

갑자기 이름이 DPP(Dynamic Partition Pruning) filter result 로 바뀌어버렸다.

위의 설명에 의하면, DPPFilterResult는 바로 t2 table(dim table) 의 filter 결과값(왼쪽 감청색 Filter+Scan)을

hash table 값으로 변환 시킨 값이리라.

 

(동영상에서는 '무언가를 재사용'해서 Filter 를 구성했다는 데... 발음이 너무 안 좋아서 이해가 안 됨 ㅠㅠ)

 

 

 

DPP Filter Result 를 사용하는 query 자체가 filter 이므로 이것을 filter push down 을 사용하여

원하는 partitions 만 t1 에서 뽑을 수 있도록 했다.

그리고 join 을 진행한다.

 

 

 

 

효과가 뛰어났다!

 

 

 

 

 

 

 

결론은 다음과 같다.

 

 

dynamic partition pruning 이용하여 이하 두 가지 측면에서의 최적화를 이룰 수 있다.

논리적 실행 계획 단계에서는 dimension table 의 필터를 찾고 fact table 에 적용하는 것

물리적 실행 계획 단계에서는 필터가 단 한 번만 적용되고, 그 결과값을 fact table에서 재사용 한다는 것

 

이로 인해 스파크는 불필요한 ETL denormalized 테이블을 만들지 않음으로,

star schema queries 를 잘 처리할 수 있게 되었다고 함.

 

가장 마지막에 말 하는 스타 스키마 쿼리는 흔히 알고 있는 관계형 데이터베이스 RDB 가 아니라

다차원 데이터베이스 MDB 에서 사용하는 개념이다.

아래 링크 참고

https://unabated.tistory.com/entry/%EB%8B%A4%EC%B0%A8%EC%9B%90-%EB%AA%A8%EB%8D%B8%EB%A7%8112

https://brunch.co.kr/@qqplot/27

http://www.incodom.kr/OLAP_%EC%86%8C%EA%B0%9C

 

 

 

 

 

Spark 3.x 이후 버전에서 Dynamic Partition Pruning 을 적용하려면 아래 옵션 값을 true 로 줘야 한다.

 

spark.sql.optimizer.dynamicPartitionPruning.enabled

 

근데 default 가 true 라서 딱히 만지지 않아도 알아서 적용 되는 듯.

아래 공식 문서 참고

https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration

 

 

 

 

참고

 

youtu.be/g-qZslQsOuE?t=813

https://www.slideshare.net/databricks/whats-new-in-the-upcoming-apache-spark-30

 

 

+ Recent posts