개인적인 포스팅입니다.

 

Spark 관련 기술 질문 꺼리들을 잊어버리지 않도록 여기다 적어둡니다.

 

 

2.


스파크 애플리케이션은 두 가지 프로세스로 구성
- 드라이버 프로세스 : main 함수 실행. SparkSession 이라고 불림. 애플리케이션을 제어.
  하나의 애플리케이션하나의 SparkSession 을 갖음
  스파크 애플리케이션(익스큐터 및 태스크)의 정보 유지 관리,
  클러스터 매니저와 통신하여 자원 얻고 익스큐터 실행 (드라이버yarn 의 AM 역할),
  사용자 프로그램이나 입력에 대한 응답,
  전반적인 익스큐터 프로세스의 작업과 관련된 분석,
  배포 및 스케줄링 역할
- 익스큐터 프로세스 : 드라이버 프로세스가 할당한 코드 작업(태스크) 수행
  작업 진행 상황을 드라이버 노드에 보고

모든 스파크 애플리케이션은 개별 익스큐터 프로세스 사용

클러스터 매니저는 스파크 앱 실행에 필요한 자원을 할당



모든 익스큐터가 병렬 작업할 수 있도록 파티션이라는 청크 단위로 데이터 분할
파티션 : 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식

dataframe 을 사용하여 파티션 된 데이터를 쉽게 처리

하나의 core 처리 = 하나의 파티션 = 하나의 task (in excutor) = 하나의 HDFS block
하둡은 데이터를 읽을 때 설정된 InputSplit 분할 정책에 따라 데이터를 분할하고
이 분할 정책에 따라 나눠진 데이터가 Spark 의 파티션 단위가 됨

HDFS 데이터 읽을 때 rdd 로 읽으면 HDFS 블럭 하나 당 파티션 하나

통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고,
Partition의 수가 많을수록 Core가 더 필요

파티션 수가 많으면 메모리 부담이 적어지고 병렬성이 올라감

Input Partition : 처음 파일 읽을 때 생성하는 Partition
기본값 128mb
HDFS 파일 크기가 128MB보다 크다면, Spark에서 128MB만큼 쪼개면서 파일 읽음
파일의 크기가 128MB보다 작다면, 작은 크기 그대로 읽음
읽은 파일 하나당 Partition 하나
대부분의 경우, 필요한 칼럼만 골라서 뽑아 쓰기 때문에 파일이 128MB보다 작음

Output Partition : 파일을 저장할 때 생성하는 Partition
기본값 64mb이나, 대개 기본 블록 사이즈로 세팅
repartition 혹은 coalesce 로 Ouput Partition 개수 설정 가능

Shuffle Partition : 셔플 진행 후 생성되는 Partition
기본개수 200개
Shuffle Partition 하나 크기가 100~200MB 정도 나올 수 있도록 개수 조절하는 것이 좋음
Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 
Shuffle Spill(셔플에 필요한 메모리가 부족해서 (데이터 SerDe 후 IO 하여) 디스크를 대신 사용)이 일어날 수 있음

참고 https://tech.kakao.com/2021/10/08/spark-shuffle-partition/



dataframe 이 한 번 생성되면 변경할 수 없는 불변성을 갖음
변경하려면, 기존의 dataframe 에서 변경된 것을 적용한 새로운 dataframe 을 생성해야 함

좁은 트랜스포메이션
- 각 입력 파티션이 하나의 출력 파티션에만 영향
- filter, where 등
- 메모리에서 작업이 일어남

넓은 트랜스포메이션
- 각 입력 파티션이 여러 출력 파티션에 영향
- group by, sort 등
- 노드간 데이터 셔플이 일어남
- 결과는 디스크에 저장

일련의 트랜스포메이션은 하나의 dag 명령 만듦


지연 연산 : action 이 일어나기 전까지 연산을 기다리는 동작

조건절 푸시다운 : 조건을 데이터 소스에 위임
  예를 들어, RDB 에서 전체 데이터를 읽고 하나의 로우만 갖고 오는 작업 진행시,
  스파크는 하나의 로우만 갖고오도록 RDB 에 where 조건 처리를 위임하여
  RDB 에서 하나의 로우만 갖고옴

액션
- 일련의 트랜스포메이션으로부터 결과를 계산하는 명령어
- count, show 등
- 액션이 일어나면 spark job 이 실행



셔플 파티션 : 넓은 트랜스포메이션에 의해 셔플 후 생성되는 파티션의 기본값은 200
  수정하려면
  spark.conf.set("spark.sql.shuffle.partitions", "5")

  참고로 spark.default.parallelism 도 동일한 의미는 갖는데,
  spark.default.parallelism는 rdd 에만 적용되고 df 에는 적용되지 않음
  spark.sql.shuffle.partitions 는 df 에도 적용됨

계보 : 일련의 논리적 실행 계획
  계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
  DAG 가 계보(lineage) 라고 보면 됨
  넓은 트랜스포메이션이나 cache 가 쓰인 부분에서 디스크에 저장되면
  디스크에 저장된 곳부터 다시 실행이 가능하게 됨



최적의 익스큐터 개수 계산

다수의 작은 executor 문제
- 하나의 파티션을 처리할 자원이 불충분 할 수 있음
- 셔플, skewed 데이터의 캐시, 복잡한 연산의 transformation 수행 시 OOME 또는 disk spill이 생길 수 있음
- 같은 노드내 executor 끼리 통신하는데 드는 비용 낭비

소수의 큰 executor 문제
- 너무 큰 executor 는 힙 사이즈가 클 수록 GC가 시작되는 시점을 지연시켜 Full GC로 인한 지연이 더욱 길어질 수 있음

executor 당 메모리는 최소 4GB 이상, CPU 는 최대 5개


최적의 파티션 개수, 크기 계산

공식이 따로 없고 각 환경에서 최적의 값을 직접 찾아야 함
최소한 Executor 개수 x Core 개수 이상으로 설정해야 함 (병렬성을 맞추는 최소 값)

일반적으로 파티션의 개수를 늘리는 것은 (오버헤드가 너무 많아지는 수준이 되기 전까지는) 성능을 높여줌
CPU가 놀지 않도록 최소한 총 코어 개수 이상의 파티션을 사용
파티션 개수를 늘리는 것은 각 Executor에서 스파크가 한 번에 처리하는 양이 적어지므로 메모리 부족 오류를 줄이는데 도움
파티션이 부족한 것보다는 차라리 조금 더 많은 것이 나음
MapReduce는 각 task의 스타트업 오버헤드가 큰 반면 스파크는 그렇지 않음

반대로, 파티션이 너무 많아도 문제가 됨
스파크 드라이버가 모든 파티션의 메타데이터를 보관
Driver memory errors & Driver overhead errors를 유발 가능
모든 파티션을 Scheduling 하는 오버헤드
작은 사이즈의 파일들을 생성하기 위한 I/O가 많이 발생(block store에서)


스파크 메모리 설명


 

 

4.


구조적 API 는 정형, 반정형, 비정형 데이터 모두 처리 가능

df, ds 는 분산 테이블 형태의 컬렉션 

카탈리스트 엔진
- 스파크 자체 데이터 타입 정보를 갖음
- 다양한 실행 최적화 기능 제공

DataFrame
- 스키마 데이터 타입 일치 여부 런타임에서 확인
- Row 타입을 갖는 DataSet
- Row 타입은 연산에 최적화된 인메모리 포맷
- Row 타입은 구조적 API 에서 데이터를 표현하는 데 사용하는 내부 카탈리스트 포맷
- Row 타입 사용시 GC 및 객체 초기화 부하를 피하고, 스파크 자체 데이터 타입 사용하여 효율적임

DataSet
- 스키마 데이터 타입 일치 여부 컴파일 타임에서 확인
- 사용 언어가 갖는 데이터 타입을 사용
- 타입 안정성을 지원



구조적 API 전체 실행 과정

1. df,ds,sql 코드 실행
2. 논리적 실행 계획 변환
3. 물리적 실행 계획 변환
4. rdd 를 이용하여 실행

논리적 실행 계획
- 추상적 트랜스포메이션만 표현
- 드라이버나 익스큐터 정보 고려하지 않고 오로지 코드 위주로 확인 (코드 문제는 없는지, 최적화 할 수 있는지)
- 다양한 표현식 최적화
- 코드 유효성 검증
- 테이블이나 컬럼 존재 여부 검증
- 카탈리스트 옵티마이저로 최적화

물리적 실행 계획
- 논리적 실행 계획을 클러스터 환경에서 실행하는 방법 정의
- 비용 모델 이용해 비교하여 최적화
- 일련의 rdd 와 트랜스포메이션으로 변환
  즉 스파크는 df,ds, sql 쿼리 rdd 트랜스포메이션으로 변환

실행
- rdd 대상으로 모든 코드 실행
- 처리 결과는 사용자에게 반환

 

 

6.


UDF 는 모든 언어에서 생성 가능
드라이버가 UDF 를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스에 전달하여 사용할 수 있도록 함

스칼라, 자바 언어로 만든 UDF 는 JVM 위에서 동작
따라서 스파크 내장 함수가 제공하는 코드 생성 기능의 장점을 활용할 수 없어서 성능 저하

파이썬 언어로 만든 UDF 는 모든 워커 노드에서 파이썬 프로세스를 실행하고
UDF 및 데이터를 직렬화하여 파이썬 프로세스에 전달
파이썬 프로세스에서 UDF 를 실행한 후 결과를 다시 스파크 익스큐터 프로세스에 전달

파이썬 프로세스 시작 부하,
파이썬 프로세스로 데이터 직렬화 및 전달 부하 가 있음


 

 

8.


셔플 조인
- 큰 테이블과 큰 테이블이 join 할 때 사용
- 전체 노드 간 네트워크 통신 발생 (join 하는 내내 발생)

브로드캐스트 조인
- 큰 테이블과 작은 테이블(단일 워커 노드 메모리에 들어갈 수 있을 정도) 이 join 할 때 사용
- 작은 테이블을 전체 워커에 복사. 이후로 추가적인 네트워크 통신 불필요
- 각 워커에서 join 진행


 

9.


다수의 파일이 존재하는 폴더 읽으면,
폴더 내 각각의 개별 파일DataFrame 의 파티션이 됨

DataFrame 을 (dest에) 쓸 때,
DataFrame 파티션 당 하나의 파일이 생성


파티셔닝
- 어떤 데이터를 어디에 저장할지 제어하는 기능
- 필요한 파티셔닝 데이터만 읽을 수 있음
- 파티셔닝 결과를 보면 여러 dir 가 있는데, 각 dir 마다 서로 다른 데이터가 내부에 들어있음
- 예) Hive Table 에서 날짜별로 파티셔닝

버켓팅
- 동일한 버킷 ID 를 갖는 데이터는 하나의 물리적 파티션에 모여있게 됨
- 그래서 데이터 읽을 때 셔플을 피할 수 있음
- 지정된 칼럼의 값을 해쉬 처리(버킷ID)하고, 지정한 수의 파일로 나누어 저장
- 버켓팅 결과를 보면 여러 파일들이 있는데, 각 파일마다 동일한 버킷ID 를 갖는 데이터가 내부에 들어있음

파티션은 데이터를 디렉토리로 나누어 저장하는 방식이고, 
버켓팅은 데이터를 파일별로 나누어 저장



스파크는 데이터를 쓸 때, 자동으로 파일 크기를 제어할 수 있음
데이터를 쓸 때 너무 크지도, 너무 작지도 않은 최적의 파일 크기를 계산하여 파일을 생성(write)함


 

 

10.


(Hive 대신) 스파크SQL 을 사용하기 위해,
쓰리프트 JDBC 서버나 SQL 인터페이스(hue, beeline2)를 사용해 스파크 driver 에 접속한 뒤 쿼리 실행 가능
하지만 스파크 SQL 은 OLTP 가 아닌 OLAP 로만 사용

스파크 프로그래밍에서 SQL 의 결과는 df 로 나타남
df 을 스파크 SQL에서 처리하려면 tempview 에 등록해야 함

Spark 는 hive 와 연동하여, 관리형 테이블과 외부 테이블을 가질 수 있음
관리형 테이블 : 데이터를 직접 관리하는 테이블
외부 테이블 : 데이터를 직접 관리하지 않는 테이블


 

 

11.


Dataset 은 JVM 기반 언어에서만 사용 가능

스파크가 Dataframe 으로 String, Integer 등의 데이터 타입을 처리할 때,
해당 타입의 객체를 생성하기보다는 Row 타입의 객체를 변환해 데이터를 처리

반대로, 스파크가 DataSet 으로 String, Integer 등의 데이터 타입을 처리할 때,
Row 타입이 아닌 사용자가 정의한 데이터 타입(String, Integer)으로 변환해 처리
사용자 정의 데이터 타입 사용하면 성능이 안 좋음
하지만 파이썬 UDF 함수 사용 성능이 더 안 좋음
왜냐하면 프로그래밍 언어 전환하는 것에 오버헤드가 있기 때문



DataSet 을 사용하는 이유
- df 기능만으로 수행할 연산을 표현할 수 없는 경우
- 타입 안정성을 갖는 데이터 타입을 사용해야 하는 경우

정확도와 방어적 코드가 중요하다면, 
컴파일 타임에서 오류 확인이 가능한 DataSet 을 사용하는게 좋음
(예 : 타입이 유효하지 않은 작업 등)

 

12.


저수준 API 는 두 가지 종류가 있음
- RDD : 분산 데이터 처리
- 브로드캐스트 변수, 어큐뮬레이터 : 분산형 공유 변수 배포 및 처리

저수준 API 를 사용하는 경우
- 고수준 API 에서 제공하지 않는 기능이 필요한 경우
- RDD 를 사용해 개발된 기존 코드 유지
- 사용자 정의 공유 변수를 다루는 경우



RDD : Resilient Distributed Dataset
불변성을 가지며, 병렬로 처리할 수 있는 파티셔닝된 레코드 모음
(구조적 API 와 마찬가지로) 액션 실행 전 지연 트랜스포메이션

Dataframe 의 각 레코드
는, 스키마를 알고 있는 필드로 구성된 구조화된 Row 이지만
RDD 의 레코드는, (프로그래머가 선택한) 언어의 객체일 뿐. Row 개념이 없음

RDD 레코드는 완벽하게 제어 가능하지만,
최적화를 하려면 (구조적 API 와는 다르게) 훨씬 많은 수작업이 필요
구조적 API 가 제공하는 여러 함수 사용 못하기에 수동으로 처리해야 함

구조적 API 와 다르게,
RDD 는 구조화된 데이터 엔진을 사용해 데이터를 저장하거나 다루지 않음

RDD API 는 모든 언어에서 사용 가능하지만
파이썬에서 RDD 를 사용하면 RDD 모든 레코드에 파이썬 UDF 를 적용하는 효과. 즉, 엄청난 성능 저하 발생

DataSet 과 다른 점은, DataSet 은 최적화 기법을 제공한다는 것


 

13.


pyspark 의 word count

text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

flatMap : 각 파티션 내 모든 레코드를 다른 값으로 변환
map : 각 파티션 내 각 레코드를 다른 값으로 변환
reduceByKey : 주어진 레코드에서 같은 키를 갖는 것끼리 집계 연산



reduceByKey vs groupByKey

reduceByKey
- combine 을 미리 진행 후 셔플
- 모든 key 값을 메모리에 유지하지 않아도 됨

groupByKey
- combine 없이 모든 레코드를 셔플
- 모든 key 값을 메모리에 유지해야 함. OOM 에러 발생 가능



coalesce vs repartition

coalesce
- 동일한 워커에 존재하는 파티션을 합침
- 셔플 발생하지 않음 (데이터 이동은 함)
- 파티션을 늘릴 수 없음

repartition
- 노드 간 셔플 발생 가능
- 파티션을 늘리거나 줄일 수 있음


 

 

14.


브로드캐스트 변수 : 모든 머신(워커 노드)에 캐싱(저장)하는 불변성(final) 변수
  직렬화, 역직렬화 없이 사용 가능
  드라이버에서 생성한 객체(dict, list 등)를 직렬화 없이 모든 익스큐터에 전달해서 모든 익스큐터에서 사용 가능
  보통 태스크(워커 노드)에서 드라이버 노드의 변수를 사용할 때, 클로저 함수를 사용한다고 하는데
  이 방법은 직렬화/역직렬화를 유발하므로 성능 오버헤드가 있어서 브로드캐스트 변수 사용하는게 좋다고 함

어큐뮬레이터 변수 : 모든 머신(워커 노드)에서 드라이버로 값을 전달
  디버깅을 위한 카운터 생성 용도로 사용
  업데이트가 가능한 공유 변수 느낌으로 사용
  어큐뮬레이터의 값은 액션 처리 과정에서만 갱신됨
  각 태스크에서 어큐뮬레이터 값은 한 번만 갱신 가능


 

 

15.


클러스터 모드 : AM(스파크 드라이버)가 NM(워커 노드) 에서 동작
클라이언트 모드 : AM(스파크 드라이버)가 클러스터매니저(yarn) 외부에 있는 사용자 머신 위에서 동작
로컬 모드 : 모든 스파크 애플리케이션이 단일 머신에서 동작
  이 경우 병렬처리를 위해 단일 머신의 스레드 활용한다고 함



yarn 과 스파크 애플리케이션 실행 과정 (외부)

- 클라이언트는 스파크 애플리케이션(jar, lib 파일 등)을 제출
- 클라이언트 머신에서 코드 실행되어 RM 에게 스파크 드라이버를 위한 자원 요청
- yarn RM 은 NM 중 하나 선택하여 컨테이너 준비하고, 그곳에 스파크 드라이버(AM)실행. 드라이버는 JVM 위에서 동작
- 드라이버(AM) 은 RM 에게 익스큐터(자원) 요청
- RM 이 NM 위에 자원 확보 후 AM 에게 전달
- 익스큐터 생성. 모든 익스큐터는 JVM 위에서 동작
- AM 이 익스큐터에 태스크 전달
- 익스큐터가 태스크 실행
- 각 익스큐터 내 태스크들은 AM 에게 진행 상황, 상태 정보, 성공/실패 보고
- AM 이 받은 보고들은 RM 에게도 전달되어 클라이언트가 RM 을 통해 AM 상태 확인 가능
- AM 이 종료된 후, RM 은 익스큐터 종료하고 익스큐터로 사용된 자원을 회수


스파크 애플리케이션 실행 과정 (내부)

- 빌더 메소드를 이용하여 런타임에 SparkSession 생성 (왜냐면 여러 라이브러리에서 SparkSession 다수 생성하려는 상황 방지)
- SparkSession 생성 후 스파크 코드 실행 가능
- 코드에서 action 이 실행되면 스테이지와 태스크로 이루어진 job 이 실행됨
  액션 하나spark job 하나
- 스테이지 : 다수의 머신에서 동일한 연산을 수행하는 태스크 그룹
  좁은 트랜스포메이션으로 실행 가능한 태스크들을 하나로 뭉친 것(파이프라이닝)이 스테이지
  셔플(넓은 트랜스포메이션)이 진행되면 무조건 새로운 스테이지가 시작됨
- 태스크 : 단일 익스큐터에서 실행할 데이터의 블록과, 다수의 좁은 트랜스포메이션 조합
  태스크는 파티션 개수대로 생성
  따라서 파티션 개수가 많아지면 병렬 처리성이 올라감
- 셔플의 결과는 local disk 에 쓰여짐
  job 이 실패할 경우, local disk 에 쓰여진 셔플 결과부터 다시 진행 가능
  (재셔플이 필수인 상황에서는 부작용으로 보이지만, 대부분의 상황에서는 시간 절약해주는 기능)


 

16.


스파크 job 은 별도의 스레드를 사용해 여러 잡을 병렬처리 가능
스파크 스레드 스케줄러는 스레드 안정성을 충분히 보장

기본적으로 FIFO 스케줄러를 사용하여 스레드 스케줄링을 진행하지만
Fair 스케줄러도 사용 가능

Fair 스케줄러는 여러 개의 job 을 pool 로 그룹화하기도 함
개별 풀에 다른 스케줄링 옵션이나 가중치, 우선순위 설정 가능

 

 

19.


성능 튜닝의 핵심이라면 당연 셔플 처리 시간 단축

코드의 주 언어와 UDF 의 언어 등이 다르게 되면,
데이터 타입과 처리 과정을 엄격하게 보장하기 어려움



스파크는 워크로드에 따라 job 에서 필요한 자원을 동적으로 조절 가능
'동적 할당'은 애플리케이션이 진행되는 중간에 스파크 애플리케이션이 익스큐터를 요청하고 제거할 수 있는 방식
애플리케이션이 자원이 사용 가능해질 때 자원을 가져다 쓰고 필요하지 않을 때 자원을 되돌려 주기 때문에
클러스터에서 성능을 끌어 올릴 수 있음

참고 https://exmemory.tistory.com/92



많은 수의 크기가 작은 파일 처리
- 많은 수의 데이터 파일 찾아 읽기 태스크 실행
- 네트워크와 잡 스케줄링 부하
- 병렬처리성 올라감

적은 수의 크기가 큰 파일 처리
- 스케줄링 부하가 덜 함
- 태스크 실행 시간 길어짐
- 병렬처리성 떨어짐

적은 수의 크기가 큰 파일 처리하는 경우,
입력 파일 수보다 더 많은 태스크 수를 설정병렬성 높일 수 있음
왜냐하면 스파크는 분할 가능한 파일 포맷을 사용한다고 간주하고
여러 태스크에서 처리할 수 있도록 크기가 큰 파일을 알아서 분할하기 때문 (input partition)



자바/스칼라 UDF 사용시, 데이터를 JVM 객체로 변환하고 쿼리에서 레코드당 여러 번 수행되므로
성능이 좋지 않음
(파이썬 UDF 는 말 할 것도 없이 더 좋지 않음)

RDD 캐시 : 물리적으로 데이터(bit 값)를 캐시에 저장
구조적 API 캐시 : 물리적 실행 계획을 저장

RDD cache() : 저장소는 MEMORY_ONLY
RDD persist() : 파라미터로 받은 곳을 저장소로 사용
  persist 실행시, job 이 끝나도 메모리에 데이터가 상주해있음
  따라서 unpersist 로 해제해야 함
캐시를 직렬화하여 저장 가능한데, 직렬화하여 캐싱하게 되면 공간 효율성은 좋지만 CPU 를 더 소모함

캐시는 데이터를 재활용하는 코드에서는 좋지만,
캐싱할 때 직렬화/역직렬화를 하기 때문에 성능이 저하되는 부분이 있음



파케이 포맷이 좋은 이유는, 쿼리에서 사용하지 않는 데이터를 빠르게 건너 뛸 수 있기 때문
스파크는 파케이 데이터소스를 내장하고 있어서 파케이 포맷과 호환이 잘 됨

분할이 가능한 파일 포맷(파케이, gzip 등)을 사용해야 읽기 작업시 모든 코어를 활용할 수 있어 병렬성이 올라감
분할 불가능한 파일 포맷(json, zip, tar 등)은 단일 머신에서 전체 파일을 읽어야 하므로 병렬성이 떨어짐



스파크 익스큐터 내 task 에서 셔플이 일어나면 익스큐터 내 disk 에 셔플 결과가 쌓이고
다른 노드에서 해당 결과를 가져가는데(pull)
만약 셔플 결과를 쌓는 task 가 죽었다면, 셔플 결과까지 사라짐

이런 경우를 방지하기 위해, 셔플 결과는 익스큐터 외부에 있는 곳에 쌓아두어
불의의 사고로 task 가 죽어도 셔플 결과는 안전할 수 있도록 함
그래서 셔플이 다시 일어나지 않도록 하여 성능을 향상시킬 수 있음 (아래 이미지 참고)


https://www.waitingforcode.com/apache-spark/external-shuffle-service-apache-spark/read

 

 

 

아래부터는 Udemy 에 있는

Apache Spark 와 Scala로 빅 데이터 다루기 강의 필기

 

rdd 는 spark context 를 사용하여 만들 수 있음

rdd 는 resilient distributed dataset 이며 여기서 말하는 dataset 은 Spark 의 API 중 하나인 DataSet 을 의미하는 게 아님

 

spark context 로 local, s3, hdfs 등의 데이터를 읽는 방법

sc.textFile("filte:///home/eyeballs/sample.txt")

sc.textFile("s3n://a/b/c.txt")

sc.textFile("hdfs:///Cluster/user/eyeballs/sample.txt")

 

그 외 Hive, HBase, Cassandra, JDBC 등 다양한 곳의 데이터를 읽고 rdd 로 만들 수 있음

 

rdd 로 하는 것은 다음과 같음

map, flatmap, filter, distinct, sample, union, intersection subtract, cartesian 등

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

- 스파크가 하둡보다 나은 점

 ㄴ 대화형 쉘 지원

 ㄴ 다양한 함수 지원(Map Reduce 를 포함하여 집계함수나 MR, Streaming 등)

 ㄴ in memory, cache, 카탈리스트 엔진 등을 통한 작업 속도 향상

 

 

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-architecture.html

 

- 위의 그림을 참고.

  하나의 노드 위에서 여러개의 executor 가 실행될 수 있지만, 하나의 executor 가 여러 노드에 걸쳐있을 순 없다.

  하나의 데이터 파티션은 하나의 executor 위에서 처리되지만, 여러 executor 위에서 처리될 순 없다.

  하나의 executor 가 갖는 core 수에 따라 파티션을 받고, 그 수 만큼 병렬 처리 할 수 있다.

  한 번 생성된 executor 는 애플리케이션이 끝날 때 까지 남아있다.

  spark 에서 캐싱한 RDD 등은 executor 위에 남아있는다.

 

- 하나의 어플리케이션은 하나의 SparkSession 을 만든다. 즉, 하나의 어플리케이션 = 하나의 Spark Session

 

- 데이터가 물리적으로 여러 컴퓨터에 나뉘어져 있는 것이 파티션(청크) 

 

- 카탈리스트 : 실행 계획 수립이나 처리에 사용하는 자체 데이터 타입 정보를 갖고 있는 엔진.

  python 이나 R 등으로 구조적 API 를 사용해도, 카탈리스트가 내부적으로 스파크 데이터 타입으로 변환하여 명령을 효율적으로 처리.

 

- Dataframe 은 Row 타입으로 구성된 Dataset임.

  Row 타입은 자체 데이터 포맷을 사용하기 때문에, Garbage collector 나 객체 초기화 부하가 있는 JVM 데이터 타입 사용 안 함.

  따라서 매우 효율적인 연산이 가능.

 

- API 실행 과정

  0. 코드

  1. 논리적 실행 계획 : 카탈리스트 옵티마이저가 사용자의 코드에서 최적화 할 수 있는 부분을 찾아 최적화

  2. 물리적 실행 계획 : 논리적 실행 계혹이 실제 클러스터에서 실행하는 방법을 정의. 비용 모델을 비교하여 최적의 전략 선택.

         이 과정에서 사용자 쿼리를 RDD 트랜스포메이션으로 컴파일함.

  3. 실행 : RDD 를 대상으로 코드 실행

 

- 브로드캐스트 변수 : 모든 task 에서 볼 수 있는(공유되는) 불변값. 직렬화/역직렬화 하지 않아서 큰 데이터 공유하는 데 효율적임.

 

- SparkSession : 저수준 API, 기존 컨텍스트, 관련 설정 정보 등에 접근 가능.

    df, ds 를 생성하거나, 사용자 정의 함수 udf 를 등록할 때 사용. 

  SparkContext : SparkSession 으로 접근 가능. 스파크 클러스터에 대한 연결을 나타냄.

    App Name 이나 현재 worker nodes 위에서 돌고있는 executor 의 메모리 크기, 코어 수 등의 정보를 갖고 있음

    RDD 같은 저수준 API, 브로드캐스트 변수 등을 사용 가능.

 

- 실행 계획 : job > stage > task

  ㄴ job : action 에 의해 생성. action 이 2개면 job 2개 생성됨. 여러 stages 로 구성됨

  ㄴ stage : 넓은 transformation 에 의해 생성. 다수의 머신에서 동일한 연산을 수행하는 여러 tasks 의 그룹.

        하나의 stage 내에 tasks 들은 모두 같은 일을 하며, 병렬 처리 한다.

        따라서 stage 1 이 끝나고 난 뒤에야 stage 2 가 시작된다.

  ㄴ task : 단일 익스큐터에서 실행할 데이터의 블록과 다수의 좁은 transformation 의 조합.

        파티션 개수에 따라 task 가 만들어짐. 만약 파티션이 5개면 5개의 task 가 만들어져서 5개가 병렬 실행됨.

        병렬 처리되는 모든 task 는 서로 다른 데이터를 대상으로 동일한 코드를 실행한다.

 

- DAG 에서 각 job 의 스테이지 그래프를 만들고, 각 task 가 실행될 위치를 결정한다.

 

- action 에 의해 실행되는 job 은 하나의 DAG 를 만든다.

  액션이 한 번 호출된 이후에는 더 이상 DAG 에 추가 안 됨.

 

- spark 가 빠른 이유는, in memory 를 사용하기 때문도 있지만,

  코드 최적화 ( 좁은 transformation 을 파이프라인으로 뭉쳐서, IO 없이 한 번에 처리 ) 가 잘 되기 때문도 있다.

 

- Spark 에서 셔플이 발생하면, ( 사용자가 직접 캐싱하지 않아도 ) 셔플 결과가 디스크에 저장되어진다.

  나중에 재사용 할 일이 생기면, 셔플 결과를 다시 가져와서 사용한다. 마치 캐싱 한 것 처럼!

 

- 잡 스케줄링을 할 수 있다.

  스파크 애플리케이션에서 별도의 스레드를 사용해 여러 job을 동시에 실행 가능하다 ( 스레드 안정성은 보장 ).

  기본적으로 FIFO 큐를 사용하며, head 부분의 job이 전체 자원을 사용하지 않는다면, 남은 자원은 두번째 job 이 사용하여 병렬 처리 가능하다.

  FIFO 처럼 순서대로 처리할 수 있고, 또 round robin 방식을 사용할 수 있다. 이것을 fair scheduler 라고 한다.

  긴 job이 오면 처리하다가 짧은 job 이 오면 잠깐 자원을 줘서 실행시키고 다시 긴 job 을 처리하는 방식.

 

- 정적 할당 : 하나의 클러스터에서 여러 스파크 애플리케이션을 실행하려면,

  실행될 애플리케이션을 위해, 클러스터가 갖고 있는 제한적인 자원을 최대한 할당해줌.

  자원을 갖고 있는 애플리케이션이 끝나야 다음 애플리케이션이 실행됨.

 

- 동적 할당 : 하나의 클러스터에서 여러 스파크 애플리케이션을 실행하려면,

  워크로드에 따라 애플리케이션이 점유하는 자원을 동적으로 조정해야 함.

  애플리케이션이 사용하지 않는 자원을 클러스터에 반환하고 필요할 때 다시 요청하는 방식. 필요할 때 받고 다 쓰면 반납

  다수의 애플리케이션이 하나의 스파크 클러스터 자원을 공유하는 환경에서 유용함.

 

- 스파크의 모든 executor는 JVM (자바 가상 머신)을 하나 실행한 후 그 위에서 실행된다.

 

- 스파크 드라이버에서 모든 애플리케이션의 상태가 보관되며, 안정적으로 실행 중인지 확인 가능하다.

 

- RDD 캐싱 : 물리적 데이터(bit 값 그 자체)를 캐시(executor 의 ram)에 저장

  구조적 API 캐싱 : 물리적 실행 계획을 캐시(executor 의 ram)에 저장

 

- 캐싱 레벨로 Memory_only 를 사용할 때, JVM(executor) 에 할당된 메모리보다 RDD 크기가 크면, 캐싱을 하지 않는다.

 

- RDD 는 executor 혹은 slave node 위에 저장된다. 애플리케이션이 동작하는 동안 로드된 RDD 는 executor 노드의 메모리 위에 있다.

 

- executor 는 캐싱과 실행을 위한 공간을 각각 갖고 있는 JVM 이다!

 

- 파티션이 중간에 유실되어도, RDD 재계산에 필요한 종속성 그래프에 대한 충분한 정보를 통해 빠른 복구가 가능하다.

 

- 넓은 transformation 이후에 파티션 개수가 따로 지정되어있지 않다면, spark.default.parallelism 으로 직접 파티션 개수 지정이 가능하다.

 

- 각 executor 에 얼마만큼의 cpu cores 를 할당하는 게 좋은가?

  물론 컴퓨터 환경마다 다르겠지만, 일단 책의 내용은 아래와 같다.

  각 노드가 갖고 있는 cpu cores 를 5로 나눈 몫만큼 각 노드에 executor 를 설정한다.

  즉, 노드마다 5개 cores를 갖는 executor 가 최대한 들어가게 만든다.

  (샌디 라이자가 말하길, executor 당 코어는 최대 다섯 개 정도가 좋다고 함)

 

executor 의 메모리 사용 영역

- cluster manager 가 할당한 memory 자원으로 executor 가 만들어지면, executor 는 위와 같이 ram 을 구별하여 사용한다.

  1. overhead  : 어디에 쓰이는 건지는 아직 모르겠지만, 아무튼 필요함. 얼마만큼 필요한지 계산하는 방법은 아래 적어둠

  2. M 영역 : executor 가 갖고 있는 전체 memory 에서 overhead 를 제외한 부분.

        실행(transformation)과 캐싱을 위한 공간. 기본 값은 60%이며, 늘리거나 줄일 수 있음.

  3. 실행 영역 : transformation 을 처리하기 위한 공간

  4. R : 캐싱된 파티션을 저장하기 위한 공간. 캐싱된 파티션이 비어있으면 이 공간을 실행 영역으로 사용할 수 있다고 함(위에 그림처럼)

        (연산이 많지 않다면) R 을 벗어나 실행 영역으로 캐싱 파티션을 저장 가능하지만,

        연산이 많아지게 되면, 연산에 실행 영역을 보장해줘야 하므로 가장 오래 전에 사용된 캐싱 파티션을 제거한다(LRU)

 

- 각 executor 가 실행할 때의 memory overhead : Max ( 요청한 executor 메모리의 n % , 기본 오버헤드 값(384mb) )
  여기서 n % 는 default가 10 % 인데, 내가 맘대로 수정 가능.

  yarn container 에서 executor 를 실행한다고 할 때, yarn 의 memory 가 executor 에서 요청한 메모리+overhead 보다 커야 한다.

 

- executor 의 memory M 공간 계산 하는 방법

    M = spark.executor.memory * spark.memory.fraction

    (M = 전체 executor memory * M에 할당된 비율)

 

- executor 의 memory R 공간 계산 하는 방법

    R = spark.executor.memory * spark.memory.fraction * spark.memory.storageFraction

    (R = 전체 executor memory * M에 할당된 비율 * R에 할당된 비율)

 

 

 

 

 

 

 

https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/

https://data-flair.training/blogs/spark-tutorial/

https://shelling203.tistory.com/33

 

 

 

 

 

 

+ Recent posts