중간 중간 쓸 데 없는 내용이나 잘못된 내용이 들어가 있을 수 있으니 직접 판단해야 함 ;;;

 

1.


Hadoop 1 과 2 의 차이

Hadoop2 에 YARN 이 도입되어,
Hadoop 1 의 Task TrackerResource Manager와 Application Master로 분리되었고
MR 작업 등에서 자원 관리를 더 효율적으로 하게 됨
Hadoop2 에서 HA 구성이 가능하게 됨



여러 디스크(혹은 독립된 여러 서버)에 데이터를 병렬로 쓰거나 읽을 때 나타나는 문제점


- 하드웨어 장애
  해결 방법 : 데이터를 여러 곳에 복제 (RAID 혹은 HDFS 의 복제)

- 정합성을 지키는 일
  해결 방법 : 데이터 블럭 스캐너(체크섬을 통해 무결성 체크)를 사용하여 주기적으로 확인

- 어떤식으로든 분산된 데이터들을 하나로 결합해야 함
  해결 방법 : MR 모델 프로그래밍으로 결합 가능


정합성 Consistency : 분산된 데이터들이 모두 같은 데이터인지 아닌지
무결성 Integrity : 데이터 자체가 깨지지 않고 온전히 자신의 값을 유지하는지



맵리듀스는 일괄 질의 처리기
비정형 쿼리를 수행하고 합리적인 시간 내에 결과를 보여줌
대화형 분석에는 적합하지 않음
따라서 수 초 이내에 결과를 받는 것은 불가능

YARN (Yet Another Resource Negotiator) 은 클러스터 자원 관리 시스템
MR, Spark 를 포함한 어떤 분산 프로그램도 하둡 클러스터에 저장된 데이터를 처리할 수 있도록 해줌
YARN 의 도움으로, Spark 가 HDFS 에 접근해서 Data Locality 이점을 받을 수 있음



RDB 와 MR 비교

RDB 는 트랜잭션이 ACID(원자성(Atomicity), 일관성(Consistency), 격리성(Isolation), 지속성(Durability) ), MR 은 없음
RDB 는 쓰기 기준 스키마, MR 은 읽기 기준 스키마
RDB 는 여러 번 읽고 쓰기, MR 은 한 번 쓰고 여러번 읽기



정형 데이터 : 형식이 정의된 항목으로 구조화 된 데이터. 스키마가 뚜렷한 데이터. XML
반정형 데이터 : 스키마가 유연하거나 심지어 생략이 가능한 데이터. JSON
비정형 데이터 : 어떠한 내부 구조도 없는 데이터. 텍스트, 이미지

하둡은 읽기 스키마(schema-on-read) 구조이기 때문에, 처리 시점에 어떤 데이터가 와도 처리 가능



데이터 센터 환경에서 가장 중요한 자원은 바로 네트워크 대역폭 

대규모 분산 컴퓨팅에서, 분산 처리되는 각 프로세스를 조율하지 않아도 됨
task 등이 실패하면 알아서 재시작이 됨
MR 태스트들 간 상호 의존성이 없는 비공유 아키텍처이기 때문에 이런 일이 가능함


 

2.


Hadoop job 은 여러개의 Map 태스크, Reduce 태스크로 나뉘어짐
YARN 에 의해 스케줄링되어 클러스터의 여러 노드에서 실행됨
특정 노드의 태스크 하나가 실패하면 자동으로 다른 노드를 재할당하여 다시 실행

잡의 입력입력 스플릿이라 부르는 고정 크기 조각으로 분리
입력 스플릿 개수만큼맵 태스크 생성하고 맵 태스크의 input 으로 넣어줌
그리고 각 map 에서 해당 input split 을 처리함

스플릿 크기가 작을수록 부하 분산에 좋은 효과
반대로 크기가 작을수록 맵 태스크 생성 오버헤드가 있고, 스플릿 관리 오버헤드가 있어 실행 시간이 증가함
적절한 입력 스플릿 크기는 HDFS 블록 크기와 같은 크기
왜냐면 (다른 노드와 네트워크 통신을 일으키지 않는) 단일 노드에 저장된다고 확신할 수 있는 가장 큰 크기이기 때문

참고 : https://eyeballs.tistory.com/m/281

데이터 로컬리티 이득을 보려면, 데이터가 존재하는 노드에서 맵 태스크를 실행해야 함
만약 모든 데이터 복제본이 있는 노드에 자원이 없어서 맵 태스크를 실행하지 못하게 된다면
동일 랙의 다른 노드에서 가용한 맵 슬롯을 찾음 (왜냐하면 네트워크 전송 오버헤드를 최대한 줄이기 위해)
만약 그것도 없다면, 다른 랙의 다른 노드에서 가용한 맵 슬롯을 찾음



맵 태스크 결과는 local 에 저장됨
이것은 리듀스를 위한 중간 결과물인데, HDFS 에 넣었다가 복제되면 그건 그것대로 오버헤드이기 때문
만약 맵의 결과 데이터가 손실되거나, 맵 자체가 실패하게 되면
다른 노드에서 동일한 맵 작업을 재실행하게 됨

리듀스 태스크는 각 맵 태스크의 결과를 입력으로 받음(네트워크를 통해 입력받음)
따라서 리듀스는 데이터 로컬리티 장점이 없음

리듀스 태스크의 수는 사용자가 독립적으로 지정 가능
리듀스 태스크 수를 1개 두고 싶다? 그래도 괜찮음. 그럼 결과 파일이 하나만 나오게 됨
리듀스 태스크 수를 100개 두고 싶다? 그래도 괜찮음. 결과 파일이 100개 나오게 됨. 
맵은 리듀스 태스크 숫자에 맞게 맵 태스크의 결과를 파티셔닝(분할)하고, 맵의 결과는 각 리듀스에서 읽어감

컴바이너 함수는 맵의 마지막 단계에서 진행되며,
데이터를 미리 처리하여(리듀스가 할 것과 동일한 처리), 셔플 단계에서 네트워크로 전송되는 데이터 양을 줄이는 역할


 

3.


HDFS 에 쓰인 파일에 내용을 덧붙이거나, 삭제하거나, 중간 데이터를 수정하는 작업이 가능할까?
제한적이긴 하지만 가능하긴 함


dfs.support.append=true 설정을 주면, 파일에 끝에만 append 하여 내용을 덧붙일 수 있음
파일 중간에 새로운 내용을 추가하거나, 파일 중간 데이터를 수정하는 것은 안 됨
"수정"한 것과 동일한 효과를 얻으려면, 아예 파일 내용 자체를 새로운 내용으로 덮어써야 함

파일의 끝부분에 append 가 가능했다면, 파일 끝부분을 삭제하는 truncate 연산도 존재함
이 역시 파일의 중간 내용을 삭제하는 것은 불가능


파일에 append 로 새로운 내용이 추가된 경우,
HDFS 블록 일관성을 유지하는 데 추가적인 비용이 발생하게 됨






HDFS 블록 크기(128mb)가 큰 이유는, 탐색 비용을 최소화하기 위해서
블록의 시작점을 탐색하는 데 걸리는 시간을 줄일 수 있고, 데이터 전송에 더 많은 시간을 할애 가능
전송시간에 더 많이 투자해야하기 떄문에 탐색하는 시간을 줄이는 것. 탐색 시간을 전송 시간의 1% 아래로 잡음
(이 말인 즉슨, 전송 효율이 점점 좋아져서 전송 시간이 줄어들면, 그 만큼 탐색 시간(1%)을 줄이기 위해 블럭 크기를 키우게 될 것)
반대로 블록 크기가 클수록 분산 처리율은 떨어짐 (스플릿 크기와 같다는 전제)

블록 개념을 도입하여 얻은 이점
- 디스크보다 큰 파일을 저장 가능
- 블럭만 다루면 되니까 스토리지 관리가 쉬움
- 내고장성 fault tolerent (장애에 견디는 능력)가용성 availability (정상적으로 사용 가능한 능력) 을 제공하는데 필요한 복제 구현 가능

블록의 복제 계수를 높이면 읽기 부하를 클러스터에 전체에 분산 가능
블록이 여기저기 많으면 읽을 때 (가까운) 블록 찾기가 수월하겠지. 그래서 읽을 때의 부하가 줄어듦
반대로 복제 계수가 높으면 디스크 용량에 부담이 됨


Map 태스크는 한 번에 하나의 블록만 처리



HDFS 는 하나의 Name Node 와 (실질적으로 데이터를 물리적으로 저장하는) 여러 대의 data node 로 구성됨
NN 는 파일 시스템(HDFS)의 네임스페이스를 관리함
즉, 파일 시스템 트리와 그 트리에 포함된 모든 파일과 디렉터리에 대한 메타데이터를 디스크에 저장하고 유지함
파일의 위치 정보는 HDFS 시스템이 시작될 때, 그리고 주기적으로 (DN로부터) 받아서 메모리에 저장하고, 디스크에 저장하지 않음

네임노드는 아래 두 파일을 이용하여, 파일과 디렉터리에 대한 메타데이터를 유지
- 네임스페이스 이미지(=fsimage) : local 디스크에 영속적으로 저장.
    실제 메타데이터를 저장
- 에디트 로그(edit log) : local 디스크에 영속적으로 저장.
    주기적으로 업데이트되는 메타데이터들을 임시로 저장하는 곳.
    주기적으로 네임스페이스 이미지와 병합됨

(namespace image 는 없는 단어고, fsimage(file system image) 가 올바른 단어)

데이터노드는 블록을 저장하고 탐색, 블록 목록을 주기적으로 네임노드에 보고

네임노드 장애 복구 기능에 3 가지 방법 사용 가능
- 메타데이터를 주기적으로 백업
- 보조 네임노드secondary namenode를 사용
    주 네임노드의 메타데이터(네임스페이스)를 주기적으로 복제해서 갖고 있음
    주 네임노드를 대신하여, 에딧 로그와 네임스페이스 이미지를 병합해서 주 네임노드에 줌
- HA 구성 
    standby 네임노드를 구성. 이 standby 네임노드는 보조네임노드가 하는 작업(네임스페이스 백업, 에딧로그와 병합)을 함

네임노드는 파일 개수에 따라 저장하는 메타데이터가 정해짐
예를 들어 500mb 짜리 파일 하나를 HDFS 에 저장하면,
블럭은 총 4개(500/128 = 대략 4) 생기지만 파일은 하나이기 때문에
네임노드에 저장되는 메타데이터는 하나
128mb 짜리 파일 다섯개를 HDFS에 저장하면,
블럭은 총 5개 생기고 파일도 다섯개이기 때문에
네임노드에 저장되는 메타데이터는 다섯

참고 https://gyuhoonk.github.io/hive-merge-query


HDFS 는 POSIX(Portable Operation System Interface) 와 유사한 파일시스템 인터페이스를 제공




데이터노드에서 빈번하게 접근하는 블록
(자바 힙 외부에서 관리되는) off-heap 블록 캐시라는 데이터 노드의 메모리캐싱 가능
이렇게 캐싱이 존재하는 데이터노드에서 캐싱된 블록을 이용한 작업을 수행하도록 하면 작업 속도 빨라지겠지!

단일 머신에서 동작하는 네임노드는 이용할 수 있는 메모리에 한계가 있기 때문에, 시스템을 더 크게 확장(scale out)하기 어려움
이런 한계를 극복하기 위해 페더레이션 federation 을 활용할 수 있음
패더레이션 기능을 사용하면, 서로 다른 머신 위에서 동작하는 여러 네임 노드가 파일시스템의 네임스페이스 일부를 나눠 관리할 수 있게 됨

예를 들어 A 네임노드는 /user dir 아래 모든 파일을 관리하고
B 네임노드는 /share dir 아래 모든 파일을 관리


federation 에서 중요한 개념은 namespace volume 과 block pool 임

< namespace volume >

네임노드가 관리하는 네임스페이스
(위의 예제에서 A 네임노드의 네임스페이스 볼륨은 /user)

각 네임노드는 자신의 네임스페이스에 속하는 파일과 디렉터리 정보만 관리하게 됨 (그래 이게 federation 이지)
각 네임스페이스 볼륨은 독립적이며, 서로 다른 네임노드의 네임스페이스 볼륨 간 의존성이나 연결은 없음




block pool 가 뭔지 이해를 잘 못하겠음.. 내용이 확실하지 않아서 아래 내용은 취소선 그어둠
확실히 알게되면 다시 업데이트 할 예정

< block pool >

파일이 블록 단위로 어디에(어떤 DataNode 에) 저장되어있는지 에 대한 위치 정보
각 네임노드는 자신만의 독립적인 block pool 을 갖고 있음
데이터노드는 여러 개의 block pool 을 가질 수 있음
(=여러 네임노드에서 생성한 블록을 저장할 수 있음)

데이터노드는 각 네임 노드에 대한 block pool 을 따로 관리함
네임노드가 비정상 종료되어도, 데이터노드는 블록 데이터를 그대로 유지함
특정 네임노드의 block pool 이 손상되어도, 다른 네임노드의 block pool 에 영향은 없음 (독립적임)


정리하자면,

하나의 네임 노드가 갖는 것
- Namespace Volumn : 파일, dir 의 메타데이터
- Block Pool : 파일의 위치 정보

하나의 데이터 노드가 갖는 것
- (여러 네임 노드로부터 온) 여러 Block Pool

NameNode1 : /namespace1 과 block-pool-1 을 관리
NameNode2 : /namespace2 와 block-pool-2 를 관리
DataNode1 : block-pool-1, block-pool-2 를 갖음
DataNode2 : block-pool-1, block-pool-2 를 갖음
DataNode3 : block-pool-1, block-pool-2 를 갖음




federation 이 적용된 HDFS 시스템에서 사용자가 특정 데이터에 접근하기 위해 어떤 네임노드와 통신해야할까?

- 직접 찾아가는 방법:
hdfs dfs -ls / 실행시 dir 구조를 볼 수 있는데 여기서 어떤 dir 가 어떤 네임노드에 속하는지 눈으로 확인할 수 있음
hdfs getconf -namenodes 명령어로 네임노드 목록을 확인
그리고 hdfs dfs -ls hdfs://namenode1:9000/data... 이런식으로 직접 네임노드를 지정하여 접근 가능함


- 알아서 찾아주는 방법 :
View File System(ViewFS) 이라는 것이 있는데
이것을 사용하면 HDFS 가 여러 개의 네임스페이스를 하나의 논리적 파일 시스템으로 통합함
통합된 이후, 사용자가 찾는 파일 정보를 갖는 네임노드를 알아서 찾아줌

예를 들어 

<property>
    <name>fs.viewfs.mounttable.Cluster1.link./data</name>
    <value>hdfs://namenode1:9000/data</value>
</property>
<property>
    <name>fs.viewfs.mounttable.Cluster2.link./logs</name>
    <value>hdfs://namenode2:9000/logs</value>
</property>

위와 같이 viewFS 를 설정한 경우
사용자가 /data 경로에 접근하면 namenode1 과 연결되고,
/logs 경로에 접근하면 namenode2 와 연결됨
즉, 사용자가 네임노드를 찾기 위한 추가 작업을 하지 않아도 됨






secondary namenode 가 존재한다고 해도, main namenode 는 여전히 Single Point of Failure 임
네임 노드에 장애가 발생하면 HDFS 블록 위치를 알 수 없으니, 데이터를  읽고 쓰는 모든 작업이 중지됨

네임 노드에 장애가 나서 복구하게 되면, 아래 작업들 때문에 꽤 긴 시간(30분 정도)동안 HDFS 가 먹통이 됨
- 네임스페이스 이미지를 메모리에 로드
- 에디트 로그 갱신
- 전체 데이터 노드로부터 블록 리포트 받기
위 작업을 진행하는 동안 NN 는 안전모드가 활성화되어, 그 어떤 요청도 처리하지 못하게 됨

따라서 네임 노드 장애가 일어나도 가용할 수 있도록, HA 구성을 하는 게 좋음
HA 설명 : https://eyeballs.tistory.com/251

정기적인 유지 관리를 위해, active NN 와 standby NN 역할을 바꾸게 하는데 이를 우아한 장애 복구라고 함

split brain 이 나타나면, fencing 메소드를 통해 처리.
SSH 펜싱 명령어로 기존 네임 노드를 확실하게 죽임



HDFS 에서 디렉터리는 데이터 노드에는 없고 네임 노드 메모리에서만 관리됨
따라서 복제 계수도 없음

HDFS 의 권한은 다음과 같음
- 읽기 권한 : 파일 읽기, 디렉터리 내용 보기
- 쓰기 권한 : 파일 쓰기/삭제, 디렉터리 만들거나 삭제
- 실행 권한 : 하위 디렉터리 접근 (파일에 대한 실행 권한은 무시됨)

권한 검사는 클라이언트의 사용자 이름을 기준으로 검사함
슈퍼유저는 권한 검사 하지 않음



MR 프로그램은 어떠한 파일시스템도 접근 가능(HDFS, local, s3,, 등)

WebHDFS 프로토콜을 이용한 HTTP Rest API 를 사용하여 HDFS 에 접근 가능

HDFS 파일을 읽을 때 globbing 이라는 와일드카드 문자를 사용하면 편함



HDFS 파일 읽고 쓰는 구체적인 절차에 대한 설명 https://eyeballs.tistory.com/265

HDFS 에서 파일 읽기 상세
- 클라이언트는 파일의 첫 번째 블록 위치를 파악하기 위해 RPC 를 이용하여 네임노드 호출
- 네임노드는 해당 블록의 가장 가까운 복제본을 갖는 데이터노드의 주소 반환
- 클라이언트는 로컬 데이터노드에서 직접 데이터 읽음
- 다 읽었으면, 다음 블록 위치를 파악하기 위해 다시 네임노드 호출

읽다가 장애가 난다면, 다른 복제본을 갖는 데이터노드의 주소를 받고, 그 노드에서 데이터를 읽음



HDFS 에 파일 쓰기 상세
- 클라이언트는 새로운 파일을 쓰기 위해 RPC 를 이용하여 네임노드 호출
- 네임노드는 동일한 파일 체크, 권한 체크 등의 검사 진행
- 검사 통과 후, 네임노드는 데이터를 저장한 데이터노드 주소 3개(복제계수)를 반환
- 데이터 노드 목록은 파이프라인을 형성함
- 클라이언트가 (패킷화된) 데이터를 첫번째 데이터노드에 보내어 저장
- 첫번째 데이터노드가 저장한 데이터를 두번째 데이터노드로 보내어 저장 (비동기적으로 수행)
- 두번째 데이터노드가 저장한 데이터를 세번째 데이터노드로 보내어 저장 (비동기적으로 수행)

(패킷화된) 데이터를 ack 큐라는 중간 큐에 저장한 후 데이터노드들에 쓰는데,
만약 장애가 나서 쓰기 실패하면, ack 큐 내의 패킷들은 다시 데이터 앞에 붙게되어 잃어버리지 않게 됨
모든 데이터 노드로부터 ack 응답을 받으면 ack 큐 내의 데이터가 제거

파일을 쓰는 데이터노드는 다음 순서로 결정됨
1. (클라이언트가 데이터노드라면) 클라이언트 노드
1. (클라이언트가 데이터노드가 아니라면) 무작위 노드
2. 1번 노드와 다른 랙의 다른 노드
3. 2번 노드와 같은 랙의 다른 노드
4. 무작위
...

위의 전략은 다음을 충족시킴
- 신뢰성 : 블록을 두 랙에 저장
- 쓰기 대역폭 : 쓰기는 하나의 네트워크 스위치만 통과함
- 읽기 성능 : 두 랙에서 가까운 랙 선택
- 블록 분산



HDFS 는 클러스터 전반에 걸쳐 파일 블록이 고르게 분산되었을 때 가장 잘 동작
만약 데이터가 한 노드에 몰려있다면, 처리 부하가 한 노드로 몰릴 수 있기 때문

오래 사용된 hadoop 시스템의 경우 고르게 분산되지 못하는 경우가 발생하며
이를 위해 background 에서 데이터를 모든 DN 에 넓게 분산되도록 도와주는 balancer 가 존재함
설명  : https://eyeballs.tistory.com/280



RPC (Remote Procedure Call ) 란, 네트워크를 통해 원격 시스템에 있는 함수(프로시저)를 호출할 수 있도록 하는 통신방식
네트워크로 연결된 다른 서버 시스템의 함수를 호출하고 그 결과를 반환받을 수 있도록 해주는 기술.


Client 가 원격 함수 호출 RPC 를 요청하면
요청이 직렬화되어 네트워크를 통해 대상 서버로 전달됨
서버는 받은 요청을 역직렬화하고 함수를 실행
실행된 결과를 직렬화하여 Client 에게 반환하고
Client는 결과를 받아서 사용함
여기서 Client 는 마치 local 함수를 사용하는 것 처럼 RPC 를 사용함.


 

 

4.


https://eyeballs.tistory.com/m/263

YARN 은 클러스터 자원을 요청하고 사용하기 위한 API 를 제공
하지만 사용자가 해당 API 를 직접 이용하진 못 함

YARN 은 리소스 매니저와 노드 매니저 두 가지 유형의 장기 실행 데몬으로 구성됨
- 리소스 매니저 : 클러스터 전체 자원 사용량 관리
- 노드 매니저 : 컨테이너(cgroup=docker)를 구동하고 및 모니터링

YARN 자체는 클라이언트, 마스터, 프로세스 같은 애플리케이션들이 서로 통신하는 기능 제공하지 않음
(= YARN 을 이용하는 앱들이 실행한 프로그램 간 통신은 앱이 알아서 하고, YARN 이 그 통신을 도와주진 않음)
YARN 위에서 동작하는 주 애플리케이션(Spark, Tez 등)들은 주로 RPC 를 이용하여 상태 변경 전달하고 클라이언트로부터 결과 받으나
이건 뭐.. 애플리케이션마다 다를 수 있음..

YARN 위에서 동작하는 애플리케이션들은 어느 때나 자원을 요청할 수 있음
처음 실행할 때 모든 자원을 요청하는 앱은 Spark 등이 있고
실행하다가 자원이 부족하다 싶으면 RM 에게 요청하여 자원을 추가로 더 받아내는 앱은 mapreduce 등이 있음



YARN 에서 애플리케이션 구동하는 절차
- 클라이언트는 리소스 매니저에게 애플리케이션 마스터 프로세스 구동 요청
- AM 를 시작할 노드 매니저 찾고, 그 위에서 AM 을 실행
- AM 에서 필요한 자원을 RM 에 요청
- RM 는 리소스와 함께 지역성 이득을 받을 수 있는 NM 을 반환
- AM 이 NM 에게 컨테이너 구동 명령 후, NM 이 컨테이너 구동하고 task 실행

AM 이 노드매니저 위에 있지 않고, 클러스터 바깥에 존재할 수 있음
이를 비관리 애플리케이션 마스터라고 부름

AM 은 어느 때나 RM 에게 자원 요청이 가능



YARN 스케줄러 (FIFO, Capacity, Fair 설명) : https://eyeballs.tistory.com/257

큐 탄력성 : 캐퍼시티 스케줄러 사용시 사용 가능
  일하는 큐와 놀고있는 큐가 존재할 때, 일하는 큐가 놀고있는 큐의 자원을 가져다가 사용 가능

페어 스케줄러 : 실행 중인 모든 애플리케이션에 동일하게 자원을 할당하지만,
  실제로 자원은 사용자 사이에서만 균등하게 공유됨 (큐 이름이 따로 명시되지 않으면, 기본적으로 사용자 이름으로 된 큐가 생성됨)
  사용자 A, B 가 각자 큐를 하나씩 갖고 있고, A사용자가 job을 1개(job1), B사용자가 job을 1개(job2) 실행하면
  job1과 job2 는 전체 클러스터 자원을 반반씩, 50%씩 차지함
  이 때 B가 job을 하나 더 실행(job3) 했을 때, job1, job2, job3이 전체 자원을 각각 33%씩 나눠갖는게 아님
  사용자 B가 갖는 자원에서 job2, job3이 자원을 균등하게 나눠갖게 되어
  결과적으로 job1 이 전체 클러스터의 50%, job2 가 25%, job3이 25% (25+25= 50 B 사용자의 자원) 을 차지하게 됨
  위의 예는 아무 설정을 하지 않았을 때(=균등한 비율로 자원 차지)이고, 각 큐마다 자원을 차지하는 비율을 정할 수 있음
  추가로, 각 큐 내에 job들이 어떻게 실행될지 (fair 로 실행될지, capacity로 실행될지, FIFO일지...) 설정 가능

선점 preemption : fair scheduler 에서 지원하는 기능.
  자원 균등 공유에 위배되는 큐에서 실행되는 컨테이너를 (자원을 얻기 위해) 죽일 수 있도록 허용하는 기능
  죽은 컨테이너는 다시 수행되어야하기에 전체 효율이 떨어짐

YARN 의 모든 스케줄러지역성 요청가장 우선시함
그래서 지연 스케줄링을 통해 가능하면 자원을 로컬로 사용 가능한 컨테이너에서 job 을 실행하려고 노력함
지연 스케줄링 : 지역성 요청을 위해 조금 늦게 컨테이너를 할당하는 것
  NM 가 RM 에게 보내는 하트비트를 통해, 해당 노드의 가용 자원 정보와 실행중인 컨테이너 정보를 알 수 있음
  그래서 heartbeat 는 애플리케이션이 실행할 컨테이너를 얻을 수 있는 스케줄링의 기회가 됨..!

RM 가 NM의 자원을 계산할 때는 메모리만 고려함 (CPU 는 고려하지 않음)
우성 자원 공평성이 true 가 되면 CPU 까지 고려됨



YARN 에서 동작하던 job, NM, AM, RM 등 이 죽었을 경우 : https://eyeballs.tistory.com/263



DRF (Dominant Resource Fairness) 는 클러스터의 리소스(CPU, Memory)를 공정하게 분배하는 스케줄링 알고리즘
어떤 job 에서 (전체 CPU 대비) CPU 를 더 많이 사용하는지, (전체 Memory 대비) Memory 를 더 많이 사용하는지에 따라
"우성 자원"이 정해지고, 각 job 들의 우성자원 비율에 따라 리소스를 할당받는 컨테이너의 양이 달라짐

예를 들어, 클러스터에 가용한 CPU 100cores, Memory 10000 GB가 존재할 때
Job A 가 제출되면서 CPU 2cores, Memory 300 GB 를 요구했고
동시에 Job B 가 제출되면서 CPU 6cores, Memory 100 GB 를 요구함

이 때 각 jobs 의 우성자원은 ('전체 리소스 대비'로 계산하여)

Job A CPU : 2/100 = 2%
Job A Memory : 300/10000 = 3%
즉 Job A 의 우성자원은 Memory (3%)

Job B CPU : 6/100 = 6%
Job B Memory : 100/10000 = 1%
즉 Job B 의 우성자원은 CPU (6%)

이렇게 제출된 Job 들은 컨테이너에에서 실행되니까
Job A, B 를 몇 개의 컨테이너에 넣어야 할 지 정해야하고, 이것을 정하기 위해 DRF 가 개입하게 됨

Job A 가 실행될 컨테이너 개수를 x 라고 하고,
Job B 가 실핼될 컨테이너 개수를 y 라고 하자.

Job A 가 실행될 x 개의 컨테이너에는 총 2x cores, 300x GB 만큼의 리소스가 할당됨.
Job B 가 실행될 y 개의 컨테이너에는 총 6y cores, 100y GB 만큼의 리소스가 할당됨.

여기서 Job A 의 우성자원은 Memory (300x / 10000GB)
Job B 의 우성자원은 CPU (6y / 100 cores)
그리고 DRF 는 위의 두 식의 비율을 동일하게 만들도록 x, y를 찾음 (300x / 10000) = (6y / 100)
그래서 x = 2y 라는 결과가 도출됨

(클러스터 자원을 최대한 활용해야하기 때문에) x= 20, y= 10 을 넣어서 비율을 맞춰줌
(CPU : 2*20 + 6*10 = 100 <= total 100cores
Memory : 300*20 + 100*10 = 7000 < total 10000GB)

결과적으로 Job A 는 20개의 컨테이너에 할당되어 실행되고
Job B 는 10개의 컨테이너에 할당되어 실행됨

YARN 이 이렇게 어렵게 산단다....



 

5.


HDFS 는 데이터 쓸 때는 Data Node가 32bit 체크섬(CRC-32c)을 계산하여 함께 저장
데이터를 읽을 때는 클라이언트가 (DN 에 저장된 체크섬과 수신한 데이터로 계산한) 두 개의 체크섬으로 무결성 확인
체크섬은 크기가 작아서 손상될 가능성이 매우 낮음
또한, 체크섬 크기 자체가 하도 작아서 스토리지 오버헤드도 1%가 안 됨

이렇게 데이터를 쓰고 읽을 때 체크섬을 이용하여 데이터 무결성 검사를 함
데이터 노드에서 체크섬을 이용한 무결성 검사에 실패하면 클라이언트에 IOException 을 주고, 그 뒤에 추가 작업(재연산 등) 진행

각 데이터노드는 체크섬 검증 로그를 영구 저장함
클라이언트가 데이터를 읽을 때  체크섬 확인을 진행하고 성공적으로 마치면, 그 검증 로그를 갱신하여 최신 날짜를 기록

DataBlockScanner : 3주에 한 번씩 전체 데이터 블럭의 체크섬을 검증하는 백그라운드 스레드



데이터를 HDFS 에 저장할 때 데이터를 압축하면 크기가 줄어들기 때문에
저장공간을 아낄 수 있고 데이터 전송 시간을 단축시킬 수 있음

데이터가 압축되어 HDFS 에 저장될 때, 압축된 파일도 block size 만큼 나뉘어져서 각 노드에 저장됨
이 때 분할을 지원하는 압축으로 압축되었다면, 흩어진 block 들은 각각의 노드에서 처리 될 수 있음(지역 이득을 봄)
분할을 지원하지 않는 압축으로 압축되었다면, 흩어진 block 들을 하나의 노드에서 모두 가져와서 처리해야 함
그래서 네트워크 이동이 발생하게 되고, 처리 시간도 오래 걸릴 수 있음..




직렬화 : 네트워크 전송을 위해 구조화된 객체바이트 스트림으로 전환
역직렬화 : 바이트 스트림을 일련의 구조화된 객체로 역전환

하둡 시스템에서 노드 사이의 프로세스 간 통신RPC 를 사용하여 구현
(=YARN 위에서 동작하는 applications들, 이를 테면 spark 등의 app 들이 실행한 jobs 간에 통신에 RPC 가 사용됨)
그리고 RPC는 직렬화 된 후 네트워크를 타고 전송됨

직렬화 설명 : https://eyeballs.tistory.com/206

하둡에서 사용하는 직렬화 방법은 Writable(Serialization), 그리고 Avro 등이 있음



sequence file : https://eyeballs.tistory.com/487



행 기반 파일 포맷 : Sequence file, Map file, Avro
  한 행에서 여러 컬럼에 접근할 때 잘 동작
  쓰기 작업 실패시 마지막 동기화 포인트까지 데이터 읽기 가능

컬럼 기반 파일 포맷 : Parquet, Orc
  소수의 컬럼만 접근할 때 잘 동작
  압축 효율이 좋음
  IO 할 때 더 많은 메모리가 필요
  IO 한 데이터를 각 행으로 분리하는 버퍼가 추가로 필요하기 때문



parquet 파일 포맷은 columnar (열) 기반 저장 방식을  사용하는 파일 포맷
Spark, Hadoop, Hive 등에 널리 사용됨

컬럼 기준으로 데이터가 저장장치 내 연속적인 위치에 저장되기 때문에 IO 성능이 좋아지고,
특정 컬럼에 접근할 때 성능이 향상됨
컬럼 단위로 모여 저장되어있기 때문에 압축률도 높음
데이터는 row group 단위로 저장됨

Parquet 은 데이터를 저장할 때 Schema(컬럼명, 데이터 타입 등)를 포함해서 저장
각 컬럼의 타입 정보를 저장해서 타입 안정성을 보장함
물론 이 Schema 정보 업데이트도 가능함
Spark 에서 Parquet 파일을 읽을 때, Spark 가 읽기 원하는 파일의 Schema 를 확인하고 필요한 부분만 읽게됨
따라서 빠르게 데이터에 접근이 가능함

크기가 큰 Parquet 파일을 여러 청크로 나눠서 병렬 처리가 가능
이런 특징을 이용하여, Spark, MapReduce 등의 병렬처리 프레임워크에서 사용하기 좋음

또한, Parquet 파일에 조건을 걸고 읽을 때, 그 조건을 미리 parquet 에서 처리해주는 Predicate Pushdown 기능을 지원함
즉, Spark 등에서 Parquet 파일을 읽기 전에 특정 컬럼의 필터링이 이미 적용되어 읽혀짐
그래서 IO 비용이 절감됨
이게 어떻게 가능하냐? 바로 위에서 설명한 Schema 를 이용함.
파일을 읽을 때, 메타데이터(Schema)를 먼저 확인하고 필요한 데이터만 읽는 방식으로 Pushdown 기능이 지원됨
예를 들어, age >= 30 이라는 조건으로 Parquet 파일을 읽는다고 하자
Parquet 파일 내부에 row group 단위로 min/max 값을 미리 계산하여 저장해두고 있는데
이 min, max 값을 이용하여 30 미만인 row group 은 아예 읽지 않고
30 이상인 row group 만 읽음 (=Predicate Pushdown 기능이 됨)


Parquet 파일 내에 중복되는 값을 dictionary 로 만들어 중복 제거하여, 실제 크기를 줄여주고
Index 도 지원해서 검색 성능도 높아짐! 아주 괜찮은 포맷이구먼!!!

단점이 하나 있음
Parquet 포맷으로 데이터를 쓸 때, (Sequence File 등) 다른 포맷에 비해 느림

결론!
Spar, Hive 등의 분산 병렬 처리 플랫폼에서 대량의 데이터를 읽거나
컬럼 단위 연산을 많이 할 때 Parquet 을 사용!




Orc (Optimized Row Columnar) 파일 포맷은 Hadoop, Spark, Hive 에서 사용되는 Columnar 기반 저장 포맷
Parquet 과 유사하지만, Hive 에 최적화된 구조와 압축 기법을 제공

컬럼 기반으로 저장되기 때문에 오는 장점 및 단점은 Parquet 과 동일함

Orc 파일은 Stripe 단위로 데이터를 저장하며,
각 Stripe 에는 메타데이터, 데이터 블록, 인덱스 블록이 포함됨
그리고 이를 통해 Predicate Pushdown 도 지원 가능 (Parquet 과 동일하게 schema 를 먼저 조사하는 방식으로)
(Orc 역시 내부에 Min, Max 를 저장하고 있기 때문에 가능함)

Orc 는 Parquet 보다 읽기/쓰기 속도가 빠르고 압축률이 더 좋음
하지만 Orc 는 Hive 에 최적화되어 있고, Parquet 은 다양한 엔진에서 사용 가능
Hive 에서 성능 향상을 위해 사용하는 "Vectorized Query Execution" 기능은 Orc 를 사용할때만 제공됨
(Parquet 은 제공하지 않음)
"Vectorized Query Execution" 는 (row 단위가 아니라) 컬럼 단위로 벡터 연산을 수행하는 기능이라고 함...

Spark에서 ORC를 사용할 수 있지만, 보통 Spark에서는 Parquet이 더 많이 사용됨

Orc 가 Hive 에 최적화되어있다고는 하지만 구체적으로 어떻게 최적화 되어있다는 것인지는 궁금하니 더 찾아봐야겠음



 

 

6.


하둡은 클라이언트에서 whoami 명령어의 결과인 사용자에 대해 HDFS 권한을 확인함
(그룹명 확인은 groups)

하둡 웹 인터페이스 실행 사용자명은 dr.who 임
따라서 웹 인터페이스를 통해 시스템 파일에 접근 불가 (왜냐면 권한이 없기 때문)



시스템 데몬 로그 : 각 하둡 데몬은 (log4j 를 사용하여) 표준 출력과 에러를 저장한 로그 생성
  HADOOP_LOG_DIR 환경변수에 지정된 dir 에 생성

맵리듀스 잡 히스토리 : 잡을 실행하는 과정에서 발생하는 이벤트(task 완료 등) 로그
  HDFS 내의 한 곳에 모임

맵리듀스 태스크 로그 : 각 태스크 자식 프로세스가 (log4j 를 사용하여) 표준 출력 표준 에러 로그 생성
  YARN_LOG_DIR 환경변수에 지정된 dir 에 생성


 

 

7.


맵리듀스 작동 방법 : job 제출

- 리소스 매니저에 (맵리듀스 잡 ID 로 사용될) 애플리케이션 ID 요청 
- 클라이언트는, 잡 실행에 필요한 잡 리소스(jar 파일, 환경 설정 파일, 미리 계산된 입력 스플릿 등)를
  HDFS 같은 공유 파일 시스템 내 잡 ID 이름의 디렉터리에 복사
  HDFS 복제계수는 10으로 아주 높은데, 높은 이유는 노드 매니저가 잡 태스크를 실행할 때 접근성을 높이기 위해
- 리소스 매니저에게 잡 제출

잡을 제출하면, 잡 리소스가 HDFS 에 저장된 상황이 됨



맵리듀스 작동 방법 : job 초기화

- RM 가 노드 매니저 위에 컨테이너 하나 할당하고 그 위에서 AM 실행
- AM 은 (클라이언트가 HDFS 에 입력한 job 리소스로부터) 입력 스플릿 정보를 읽음
- 입력 스플릿별로 맵 태스크 객체 생성 (이 때 각 태스크의 ID 가 부여됨)
- 만약 job 이 작아서 AM 실행되는 노드 매니저위에서 태스크 실행이 가능하면, 우버 태스크로 실행
  job 이 작다는 것은 HDFS 블록 하나보다 입력이 작은 경우

job 이 초기화되면, AM 이 실행되고 맵 태스크를 실행할 객체가 생성된 상황이 됨


맵리듀스 작동 방법 : task 할당

- AM 은 RM 에게 잡의 모든 맵과 리듀스 태스크를 위한 컨테이너 요청 (CPU, Memory 등)
  우선순위는 (리듀스 태스크보다) 맵 태스크가 높음 (Map Tasks > Reduce Tasks)
- 맵 태스크는 데이터 지역성을 고려한 노드 매니저 위에서 실행

지역성이 고려된 노드 매니저 위에 task 가 할당되고, task 가 실행될 container 까지 마련됨


맵리듀스 작동 방법 : task 실행

- RM 가 특정 노드상의 Container 를 위한 리소스를 태스크에 할당하면
- (AM 에 의해) NM 위에서 컨테이너가 시작됨
- 각 태스크는 자바 애플리케이션으로 실행(=JVM 이 뜨고 그 위에서 Task 가 실행된다는 말)
- 태스크 실행 전 job 리소스(jar 파일 등)를 HDFS 로부터 컨테이너 local 로 가져옴

태스크는 독립된 JVM 위에서 동작하기 때문에
(태스크의 버그, 강제 종료, hang 등으로) 노드 매니저에게 영향을 주지 않음

리소스를 할당받은 Container 내에 JVM 에서 Task 가 실행됨!


맵리듀스 작동 방법 : 진행 상황과 상태 갱신

- 하둡은 진행 중인 태스크는 실패로 보지 않기 때문에 진행 상황 보는 것이 아주 중요
- 사용자는 콘솔 화면에서 map, reduce 의 처리 비율을 볼 수 있음
- task 는 AM 에게 3초마다 진행 상황과 상태 정보를 집계하여 보고
- AM 에 보고된 진행 정보는 RM 의 WebUI 를 통해 사용자에게 보여짐

Task 가 실행되면서 AM 에게 상태를 보고하고, 보고된 상태 내용은 RM 의 WebUI 를 통해 확인이 가능



맵리듀스 작동 방법 : job 완료

- AM 이 task 로부터 작업 마무리 되었다는 보고 받으면 job 상태를 '성공' 으로 변경
- 사용자에게 통지할 메세지 출력

클라이언트는 mr 작업이 마무리 된 것을 HTTP job 통지를 통해 callback 받을 수 도 있음


참고로 waitForCompletion()은 MapReduce Job을 실행하고 완료될 때까지 대기하는 역할
즉, 사용자가 제출한 MR job 이 끝날 때까지 Blocking 상태(다음 코드 실행 멈춤)로 기다림(=동기 방식)
Non-Blocking(비동기 방식)을 사용하려면 그냥 submit() 을 사용하여 Job 실행하면 됨



https://eyeballs.tistory.com/263

사용자 코드 이슈로 태스크 실패
- AM 에게 실패 보고
- 사용자 로그에 기록
- task 컨테이너 자원 풀어줌

hang 에 걸린 태스크
- 10분 이상 task 보고를 받지 못한 AM 은 해당 task 를 실패로 간주. 강제 종료
- 다른 노드 매니저에 동일한 task 재실행
- 같은 이슈가 4번 반복되면 job 실패

애플리케이션 마스터 실패
- AM 이 주기적으로 RM 에게 보내는 하트비트가 끊기는 경우
- AM 실패시 다른 노드 매니저에서 AM 을 재실행
- 같은 이슈가 2번 반복되면 job 실패

노드 매니저 실패
- NM 가 주기적으로 RM 에게 보내는 하트비트가 끊기는 경우
- RM 은, 실패한 NM 에 컨테이너 할당하지 않도록 노드 풀에서 제거
- 실패한 NM 에 AM 이나 task 실행시, 위의 절차대로 AM 와 task 재실행

리소스 매니저 실패
- RM 이 실패하면 모든 실행중인 잡이 실패함
- HA 구성하여 RM 복구 가능 (RM 에 Failover Controller 가 포함되어있음 : https://eyeballs.tistory.com/251)
- 실행중인 애플리케이션에 대한 모든 정보zookeeper 혹은 HDFS 에 보관되기에
  standby RM 가 active RM 이 된 후에도 핵심 상태 복구 가능
- RM 이 재실행되면 모든 AM 이 재실행 됨
- RM 이 재실행되면, NM 와 Client 는 round robin 방식으로 Active RM 을 직접 찾음



맵 부분

- 처리 결과는 환형 구조의 인메모리 버퍼에 씀
- 메모리가 꽉차면, 메모리 내에서 리듀서 개수에 맞게 파티션 나누고 정렬하고, 정렬된 채로 disk 에 spill
- map 결과는 여러 spill 의 병합(파티셔닝은 아직 되어있음)
- 3개 이상의 spill 인 경우 컴바이너가 실행됨

map 의 결과는 job 이 끝날때까지 사라지지 않음
왜냐면 reducer 가 복사해가야하는데 실패할 수 있으니까



리듀스 부분

- HTTP 를 통해 map 의 결과 중 자신에게 맞는 파티션 부분을 복사해서 가져옴
- map 을 어떻게 찾는가? AM 에게 물어봐서 map 위치 찾음
- 리듀스 disk 에 map 의 여러 결과들이 축적되면 더 크고 정렬된(정렬 순서 유지) 하나의 형태로 병합 (round 단위로 병합)
- 하나로 병합된 후 리듀스의 입력으로 들어감



투기적 실행 (Speculative Execution)
- 하나의 느린 태스크가 전체 job 수행 시간을 지연시킬 수 있음
- 느린 태스크를 감지하면, 또 다른 동일한 태스크를 실행
- 둘 중 하나가 먼저 끝나면, 끝나지 못한 다른 태스크는 강제 종료

투기적 실행의 목적잡 실행 시간을 줄이는 것이지만
클러스터 효율성 측면에서 비용이 발생하고 전반적인 단위 시간당 산출물이 줄어듦
그래서 투기적 실행은 안 하는 게 좋음
(리듀스가 투기적 실행을 해버리면, 셔플을 중복으로 하기 때문에 네트워크 트래픽을 심각하게 증가시킴...)


 

 

8.


입력 스플릿
https://eyeballs.tistory.com/281
https://eyeballs.tistory.com/264

입력 스플릿은 논리적인 개념임
입력 스플릿은 테이블(읽은 데이터)의 특정 범위에 있는 행의 집합이며, 레코드는 해당 범위에 속한 행을 의미함
MR job 실행하여 데이터를 읽으면, 읽은 데이터는 입력 스플릿으로 나눠짐
나눠진 입력 스플릿은 맵에서 처리함. (하나의 맵이 하나의 입력 스플릿을 입력으로 받고 처리)
입력 스플릿은 방대한 데이터를 병렬처리하기 위해 각 맵에서 읽는 단위임

입력 스플릿이 논리적인 개념이라면, HDFS block 은 물리적인 개념임.
실제로 데이터가 물리적으로 block size 만큼 나눠져서 여러 서버에 저장됨

대개 입력스플릿과 HDFS block size 는 동일하게 설정함
간혹 서로 동일하게 설정해도 입력스플릿이 커지는 경우가 존재함.
예를 들어, Text File 에서, 한 레코드가 너무 길어서 (130MB라고 하자) block size(128MB) 를 넘어버린 경우
입력 스플릿은 block size 를 넘은 130MB 가 되고
이 130MB 짜리 단위의 데이터(입력스플릿)을 맵 하나에서 처리하게 됨
130MB 로 늘려서 행을 유지했으므로(행이 유실되거나 끊어지지 않음) 데이터에 손실은 없음
130MB 를 처리하는 맵에서 2MB 정도(130-128)의 데이터를 다른 block 에서 가져와서(=네트워크 비용 발생) 처리함

HDFS block 은 데이터를 물리적으로 나누되, (입력 스플릿과 달리) 파일을 byte 단위로 나눠서 저장함
따라서, 한 행(레코드)가 아주 길어서 block size 를 넘어가는 130mb 짜리 행을 갖는 데이터가 들어온다고 해도
행의 경계를 고려하지 않고 데이터를 128mb, 2mb 짜리 블록으로 나눠서 저장함
입력 스플릿은 행 단위로 데이터를 잡기 때문에, (위와 같이 두 개로 나눠진) 130mb 를 하나의 입력 스플릿으로 잡음
이 때 레코드 리더행이 중간에 잘린 경우 자동으로 다음 블록에서 나머지를 읽도록 도와줌
예를 들어, 레코드 리더가 첫번째 블록(128mb)을 읽었는데 해당 데이터의 마지막이 중간에 잘렸다면
해당 블록에서는 이 줄을 완전한 행으로 만들지 않음
그리고 두번째 블록(2mb)을 읽을 때, 이전 블록에서 중간에 잘린 행을 감지하고 두 번째 블록에서 이어지는 데이터를 가져와 완전한 행을 만듦
즉, 레코드 리더는 행의 경계를 인식하며, 중간에 잘린 데이터가 있으면 다음 블록에서 이어지는 부분을 가져와 하나의 완전한 행으로 조립




 

 

10.


데이터 노드에 RAID 를 적용하여도 이득은 거의 없음
- RAID 가 제공하는 데이터 중복성을 HDFS 가 이미 갖고 있기 때문
- RAID 에 데이터를 쓸 때 가장 느린 디스크에 의해 속도가 결정남
  HDFS 에 데이터를 쓸 때 병렬로 쓰기 때문에 가장 느린 디스크보다 속도가 빠름
- RAID 디스크가 고장나면 전체가 고장날 수 있음

단, 네임 노드에 RAID 적용은 이득이 있음



마스터 노드인 네임 노드, 리소스 매니저 등의 마스터 데몬은 각기 다른 서버에서 동작하도록 하는 게 좋음
네임 노드와 보조 네임 노드 역시 각기 다른 서버에서 동작하도록 함
데이터 노드, 노드 매니저들은 마스터 노드와 다른 서버들 위에서 함께 동작

이렇게 마스터 노드를 분리하는 이유 고가용성 HA 때문
active 가 죽는 경우 standby 가 일 해야하기 때문




네트워크 거리 계산을 위한, 네트워크 토폴로지 설정 정보는 관리자가 직접 넣음
알아서 계산하지 않음



아래는 하둡 설치 절차

hadoop 설치할 때는 /usr/local 혹은 /opt 에 설치

hadoop 데몬 실행시 하둡 전용 사용자 계정(hdfs, mapred, yarn 등)을 생성하는 것이 좋음
다른 서비스와 하둡 프로세스를 구분하기 위함

제어 스크립트(start-dfs.sh 등)는 SSH 를 이용하여 전체 클러스터에 작업 수행
따라서 SSH 접근시 비밀번호가 없어야 함
제어 스크립트를 사용하는 것은 선택사항

HDFS 처음 설치시, 파일 시스템 포맷 진행이 반드시 필요
포맷 작업새로운 빈 파일시스템을 생성하는 것
데이터 노드는 동적으로 클러스터에 포함/제외 될 수 있기에 포맷 작업은 오로지 네임 노드가 진행

workers 파일에 데이터 노드와 노드 매니저가 동작할 워커 노드들의 ip 넣음

start-dfs.sh 실행시, 네임 노드hdfs getconf -namenodes 명령어의 결과로 나온 노드들에서 실행됨
리소스 매니저start-yarn.sh 명령어가 실행된 노드에서 실행



하둡의 각 노드들자신만의 환경 설정 파일을 갖고 있음
따라서 관리자는 이 파일들을 전체 클러스터에 동기화 해야 함
왜냐면 모든 파일이 동일한 내용을 갖어야 하기 때문
좋은 성능의 노드가 들어와도 동일한 환경 설정 파일을 활용해야 함(...)



하둡의 각 데몬 당 1gb 의 메모리가 기본으로 할당됨
hadoop-env.sh 의 HADOOP_NAMENODE_OPTS 속성에 JVM 옵션으로 메모리 용량 설정하면
다른 하둡 데몬에 영향 없이 Name Node 의 메모리 용량 증가 가능

로그 파일 default 는 $HADOOP_HOME/logs
대개 /var/log/hadoop 로 변경하는데, 이유는 hadoop home 이 바뀌어도 동일한 곳에 로그를 유지하기 위함

하둡 로그는 두 가지
.log : log4j 통해서 생성. 애플리케이션 로그 메세지. 알아서 지워지지 않음
.out : 표준 출력, 표준 에러. 최근 5개만 로그만 보관됨

하둡 데몬은 데몬 사이 통신을 위한 RPC, WebUI 제공을 위한 HTTP 모두 실행
추가적으로 데이터 노드 블록 전송을 위해 TCP/IP 실행

기본적으로 데이터 노드는 저장된 저장 디렉터리 가용한 모든 공간을 사용하려고 함

단락 지역 읽기
- 블록이 있는 노드에서 클라이언트가 동작한다면, TCP/IP 없이 곧바로 블록 읽을 수 있음


 

11.


클라이언트가 HDFS 에 업데이트를 하면,
업데이트는 local disk 의 edit log 에 기록된 후 메모리상의 메타데이터가 변경

fsimage 는 메타데이터의 영속적인 체크포인트
업데이트 내역은 fsimage 에 곧바로 쓰이지 않는데, 왜냐면 크기가 많이 늘어나 성능이 안 좋아지기 때문

네임 노드 장애 발생시, fsimage 를 메모리에 로드하고
edit log 에서 특정 지점 이후 발생한 변경 내역들을 메모리에 반영

edit log 역시 크기가 커질 수 있음
따라서 보조 네임노드를 이용하여 fsimage 와 edit log 를 병합
- 보조 네임노드는 네임노드로부터 fsimage 와 edit log 를 가져감
- 그 동안 네임노드는 새로운 edit log 를 사용
- 보조 네임노드는 fsimage 와 edit log 를 병합
- 병합한 fsimage 를 네임노드에게 줌



네임노드 시작시, fsimage 를 메모리에 로드하고
edit log 변경 내역들을 메모리에 반영하고
데이터 노드들로부터 파일 위치 등을 보고받음
이 작업을 하는 동안 안전 모드가 켜지며, 쓰기는 금지되고 읽기만 가능


dfsadmin : HDFS 의 상태 정보 확인 및 다양한 관리 작업 수행 도구

fsck : HDFS 에 저장된 파일 상태 점검 도구
- 초과 복제 블록 : 초과된 만큼 삭제
- 복제 기준 미만 블록 : 미만인 만큼 복제
- 잘못 분배된 블록 : 한 랙에 모든 블록이 있는 경우. 다시 넓게 퍼뜨림
- 모든 복제본이 손상된 블록 : 놔두거나 lost+found 로 옮김
- 존재하지 않는 블록 : 놔두거나 lost+found 로 옮김

블록 스캐너 : 데이터 노드에 저장된 모든 블록을 3주마다 점검. 모든 데이터 노드가 실행

diskbalancer : 하나의 데이터 노드여러개의 disks 가 있는 경우, disks 간 데이터 불균형이 일어났을 때 불균형을 해소시켜줌
balancer : 클러스터에서(여러 데이터노드 간) 데이터 불균형이 일어났을 때 불균형을 해소시켜줌. 
https://eyeballs.tistory.com/280



메트릭 : 하둡 데몬이 수집. 관리자를 위한 정보
카운터 : MR task 가 수집. MR 사용자를 위한 정보

아래는 최적화 내용

< 메모리 >
MR 작업은 CPU 보다 메모리 자원에 더 민감함. 실제로 MR 작업시 메모리를 많이 사용
스왑 메모리 사용량을 모니터링하고 메모리 옵션값을 조절하여 최대한 swap out 이 발생하지 않도록 함

< IO >
file/dir 에 접근 할 때마다 시간 기록을 하지 않도록 noatime 옵션으로 구성
복제 등은 이미 HDFS 에서 제공하고 있으므로 추가 복제를 일으키는 RAID를 사용하지 않음

< 네트워크 >
컴바이너를 꼭 사용하여 셔플되는 데이터 양을 줄여 네트워크 오버헤드 줄임
대용량의 매퍼 결과값을, Snappy 등을 사용하여 압축하는 것으로 네트워크 오버헤드를 줄일 수 있음

< 기타 >
JVM GC 실행 횟수 등을 모니터링 하면서 GC 튜닝을 한다던가 코드에서 쓸데없이 낭비되는 메모리를 줄이는 방향으로 한다던가 함

참고 [링크1] [링크2]


 

 

Zookeeper


주키퍼란, 분산 작업을 제어하기 위한 트리 형태데이터 저장소

노드 종류는 세 가지
- 영구 노드(Permanent Node) : 한 번 저장한 데이터가 영구적으로 유지되는 노드
- 임시 노드(Ephemeral Node) : 클라이언트 세션이 유효한 동안만 살아있는 노드
- 순차 노드(Sequence Node) : 저장하는 순서에 따라 자동으로 일련번호가 붙는 노드

주키퍼는 빠른 처리를 위해 모든 트리 노드를 메모리에 올려놓고 처리하지만
Memcached, Redis 와는 다르게 사용됨

주키퍼에서 제공하는 기능 중, 데이터 변경을 감시하여 콜백을 실행하는 감시자(Watcher)가 있음
감시자를 트리 내의 특정 노드에 등록하면,
별도로 데이터 폴링을 하지 않더라도 노드 변경 사항을 전달받을 수 있음

주키퍼 클러스터(앙상블)는 한 대의 leader 와 나머지 follower 로 구성
클라이언트의 읽기 요청은 leader, follower 모두 수행 가능
클라이언트의 쓰기 요청은 leader 만 가능
쓰기 요청을 받은 follower 가 leader 에게 요청을 넘기고,
leader 는 follower 들로부터 쓰기 ack 응답을 요청
과반수 이상의 ack 응답이 오면 follower 들에게 쓰기 진행

NN 같이 분산 처리 제어를 위해 사용되는 주키퍼는 소모하는 하드웨어 자원이 굉장히 적음
multi tenant 기능이 존재함
클라이언트 세션을 생성할 때 특정 노드 경로를 지정하면, 이후 접근은 이 경로 하위만을 사용함
즉, 클라이언트마다 각기 다른 경로를 주고 그 경로만 사용하도록 함

클라이언트 계정마다 각기 다르게 노드를 사용할 수 있도록 ACL(Access Control List) 지정 가능
상위 노드에 ACL 이 걸려있어도 하위 노드에 ACL 이 자동으로 붙지 않음
모든 하위 노드에 ACL을 적용해야 하는데 좋은 방법이 아님
따라서 주키퍼를 공유 서버 형태로 제공하기 위험함




참고 https://d2.naver.com/helloworld/583580



리더 선출 기능
- 주키퍼에 리더 선출을 위한 path 생성 (예를 들면 /election)
- 해당 path 이하 SEQUENCE| EPHEMERAL 플래그를 줌
- 서버 (혹은 프로세스) 가 붙을 때마다 순차적으로 값이 붙음 (sequence flag 때문)
- 생성된 znode 들 중, 리더는 항상 가장 작은 값을 갖는 znode
- 가장 작은 znode 값에 감시자가 붙음
- 리더가 죽으면 감시자가 알게되고, 그 다음 작은 값을 갖는 znode 가 리더가 됨

참고 https://zookeeper.apache.org/doc/current/recipes.html

분산락 기능
- 주키퍼에 분산락 기능을 위한 path 생성 예를 들면 /lock)
- 해당 path 이하 SEQUENCE| EPHEMERAL 플래그를 줌
- 서버 (혹은 프로세스) 가 붙을 때마다 순차적으로 값이 붙음 (sequence flag 때문)
- lock 을 원하는 서버들이 getchildren 함수를 사용하여 /lock 이하 노드들을 받음
- 가장 낮은 znode 를 갖는 노드를 받은 서버가 lock 을 가져감
- 나머지 서버들은 기다림
- lock 을 해제할때는 lock 을 가져간 서버에서 가장 낮은 znode 를 삭제
- 그 다음 낮은 znode 를 가져간 서버가 lock 을 가져감...

참고 http://think-in-programer.blogspot.com/2011/01/5-zookeeper.html

분산처리 시스템에서 ACID 속성중 정합성(Consistency) 독립성(Isolation)을 보장하는것은 쉽지 않다. 
주키퍼는 데이터 저장시 아래와 같은 사항을 보장한다.
- 순차적 정합성(Sequential Consistency) : 주키퍼 클러스터에 저장되는 데이터는 강한 정합성(Strong Consistency)를 보장하지 않고, 이벤추얼 정합성(Eventual Consistency)을 보장한다. 이벤추얼 정합성은 일정 시간이 지나면 정합성이 맞춰지는 속성이다.
특정 클라이언트로부터 데이터 저장에 대한 요청이 있을때, 분산되어 있는 주키퍼 서버에 반영되는 순서는, 클라이언트에서 전송된 요청 순서대로 처리되는것을 보장한다.
- 원자성(Atomic) : 전체가 수행되거나 전체가 실패되는 행위로, 부분적인 성공은 존재하지 않는다.
- 단일 이미지 뷰 제공 (Single System Image) : 클라이언트는 어떤 주키퍼 서버에 접속하더라도 동일한 데이터 뷰를 제공 받는다.
- 안정성 (Reliability) : 주키퍼에 저장된 데이터는 (클라이언트의 명시적인 호출에 의해 수정되지 않는 한) 영속성을 가지고 있다.
출처: https://sungwookkang.com/1431 


eventual consistency 와 strong consistency
https://velog.io/@soongjamm/Eventual-Consistency-%EB%9E%80 
CAP 이론 중 일관성과 가용성을 기준으로 갈림

Consistency 일관성 : 모든 사용자가 동일한 값을 반환 받음
Availability 가용성 : 모든 요청에 대해 (실패가 아닌) 정상적인 응답 반환 받음

결론
eventual consistency
- 업데이트 후, 모든 노드가 업데이트 되지 않음(정합성이 맞춰지지 않음)
- 사용자는 오래된 정보를 볼 수 있어 가용성이 유지되나 일관성 없음
- 언젠가는 모든 노드에 업데이트가 적용(언젠가는 정합성이 맞춰짐)
- 예) Zookeeper

strong consistency : 
- 업데이트 후, 모든 노드가 업데이트 적용받을 때 까지 lock 에 걸림(정합성이 처음부터 유지됨)
- lock 에 걸려서 사용자가 사용 불가. 가용성 없으나 일관성은 유지
- 예) RDB



왜 3대 이상의 홀수 개수로 유지하는 것이 좋은가?

짝수로 운영해도 문제는 없으나
짝수나 홀수나 장애(결함) 허용 개수가 동일함

3대인 경우 과반수는 3/2+1 = 2대 : 결함 허용 개수는 1대 (=3-2)
4대인 경우 과반수는 4/2+1 = 3대 : 결함 허용 개수는 1대 (=4-3)
5대인 경우 과반수는 5/2+1 = 3대 : 결함 허용 개수는 2대 (=5-3)
6대인 경우 과반수는 6/2+1 = 4대 : 결함 허용 개수는 2대 (=6-4)


 

 

HDFS 는 one-copy-update-semantics consistency 를 갖음

모든 클라이언트는 파일의 복사본이 하나만 존재하는 것처럼 파일의 내용을 동일하게 봄

모든 사용자가 파일 업데이트를 즉시 볼 수 있으며

디렉터리 목록 결과는 해당 디렉터리 내의 최신 상태를 보여줌

 

반면 S3 는 eventual consistency 를 갖음

업데이트가 모든 사용자에게 영향을 끼치기까지 시간이 좀 걸림

 

참고  [공식 문서] [HDFS vs S3 비교]

 

 

 

 

 

 

 

 

 

하둡 MR 은 잡의 입력을 입력 스플릿 혹은 스플릿 이라는 고정 크기 조각으로 분리

분리 된 개수만큼 맵 태스크가 생성

스플릿의 각 레코드를 사용자 정의 맵 함수로 처리

 

스플릿 크기가 작을 수록, 병렬 처리성(부하 분산)에 효과적이다.

하지만 크기가 너무 작아지면 스플릿 관리와 맵 태스크 생성을 위한 오버헤드 때문에

잡의 실행 시간이 증가함

 

일반적인 스플릿 크기는 HDFS 블록의 기본 크기인 128mb 가 적당

최적의 스플릿 크기가 HDFS 블록 크기(128mb)와 같아야 하는 이유는 

그 블록 크기가 단일 노드에 저장된다고 확신할 수 있는 가장 큰 입력이기 때문

하나의 스플릿이 두 블록에 걸쳐있을 때(128mb 이상일 때)

두 블록 모두 저장하는 HDFS  노드는 존재할 가능성이 낮아서

스플릿의 일부 데이터를 네트워크를 통해 맵 태스크가 실행되는 다른 노드로 전송해야 함

이렇게 되면 맵 태스크 전체가 로컬 데이터만 이용할 때보다 느려짐

 

입력 스플릿에 대한 추가 설명 : https://eyeballs.tistory.com/264

 

 

맵 태스크의 결과는 HDFS 가 아닌 로컬 디스크에 저장됨.

HDFS 에 저장되면 replication 해야하니까 적절치 않음.

 

 

Reduce 태스크의 경우 입력값이 Map 태스크의 결과값이기 때문에

HDFS 를 이용한 데이터 지역성의 장점이 없음.

 

 

위에 필기 중간에 맵리듀스 셔플 정렬 부분 그림

 

 

아래는 셔플 절차를 다시 쓴 것

 

 

리듀서 부분에 대해 좀 더 알아본다.

 

맵의 결과는 맵이 실행된 노드의 로컬 디스크에 저장됨.

맵 태스크는 각기 다른 시간에 끝날 수 있으므로,

리듀스 태스크는 각 맵 태스크의 출력이 끝나는 즉시 리듀스가 실행되고 있는 노드로 맵의 결과를 복사하기 시작.

 

맵 출력의 크기가 충분히 작다면, 리듀스 태스크 JVM 메모리로 복사가 됨.

인메모리 버퍼가 한계치에 도달하거나

맵 출력 수가 한계치에 도달하면 병합되어 디스크에 spill이 됨.

(이때 컴바이너가 지정되었다면 병합 도중 실행되어 디스크에 쓰여지는 데이터양을 줄임)

 

복사된 파일이 디스크에 저장축적되면 background thread 가 이들을 더 크고 정렬된 형태의 파일로 병합.

(추후 병합 시간을 절약하기 위함)

 

모든 맵 출력이 복사되었다면, 리듀스 태스크는 정렬병합단계로 이동

맵 출력을 병합하고 정렬 순서를 유지

이 작은 라운드 단위로 이뤄지고 병합 계수에 따라 파일을 병합

예를 들어 50개의 맵 출력이 존재하고 병합계수가 10이라면 5개의 라운드가 구성되어

각 라운드마다 10개의 파일을 하나로 병합함

결과적으로 5개의 중간 파일이 생성

 

이 5개의 중간 파일들을 하나로 병합하지 않고,

곧바로 리듀스 함수로 전송하여 디스크 IO를 줄임.

즉, 최종 라운드는 리듀스 함수로 곧바로 전송함.

 

 

 

 

 

 

 

 

 

+ Recent posts