Nifi 에서 Kafka 의 topic 에서 메세지를 consume 하는 중

알아낸 사실을 여기 적는다.

 

nifi 는 3대의 노드로 Cluster 구성이 되어있음.

 


< Kafka 환경 >

Broker : 3대 (전체 3대 중 3대 모두 사용)
Topic : 1개
Partition : 3
Topic Name : EYE

EYE topic 에 producer 를 통해 123, 234, 345 라는 데이터를 각각 넣은 다음
Nifi ConsumKafka_2_6 으로 읽어들임.
Nifi Broker 주소(3대 중 1대 주소)와 Topic Name, Group Name 은 모두 동일함
Nifi Primary Processor 는 없음

예상) Broker 가 3개이므로 각 노드가 데이터를 하나씩 읽을 것이다.(예를 들어 노드 1에서 123, 노드 2에서 345, 노드 3에서 234)
실제) 노드 1에서는 아예 읽지 않았고, 노드 2에서는 234, 노드 3에서는 123, 345 를 읽었음.
  Broker 개수와 상관 없이 Nifi 노드들에 동일하게 분배되지 않음.

만약 데이터 개수가 10개(0,1,2,3,4,5,6,7,8,9)라면 어떻게 될까
이번에는 노드 1에서 5개 (0,5,4,7,8) 노드 2에서 4개 (3,6,2,9,1) 노드 3에서는 아예 읽지 않음.
여러번 반복해도 중복 없이 노드들끼리 서로 나눠갖는 것을 볼 수 있었음(균등하지 않고 그냥 랜덤하게 나눠 갖음)
예를 들어 어떤 때는 노드 1이 8개, 노드 2가 1개, 노드 3이 1개 가져간 적도 있고
어떤 때는 노드 1이 3개, 노드 2가 5개, 노드 3이 2개 가져간 적도 있음.

결론적으로 노드들 중 데이터를 먼저 뽑아읽는 쪽에서 데이터를 가져가는 듯 함.

 


< Kafka 환경 >

Broker : 1대 (전체 3대 중 1대만 사용)
Topic : 1개
Partition : 1
Topic Name : EYE

EYE topic 에 producer 를 통해 10개의 데이터를 각각 넣은 다음
Nifi ConsumKafka_2_6 으로 읽어들임.
Nifi Broker 주소(3대 중 사용중인 1대 주소)와 Topic Name, Group Name 은 모두 동일함
Nifi Primary Processor 는 없음

이번에는 Partition 을 1 로 축소시켜봄.
위와 동일한 테스트를 진행

예상) 위의 테스트 결과와 마찬가지로 Nifi 3대 Node 들 중 데이터를 먼저 읽는 노드에서 데이터를 가져갈 것이다.
실제) 3대의 Nifi 노드 중 한 대만 모든 데이터를 읽음. 10개의 데이터를 Kafka 에 넣었는데, 노드 2가 모두 읽음.
  몇 번을 해봐도 노드 2가 읽는 것은 변하지 않았음.
  Broker 주소 3대 중 사용중인 하나 주소만 넣은 상태에서는 노드 2만 읽음.
  사실 Broker 3대 중 어느 것을 넣어도 상관 없으니 사용하지 않는 주소를 넣어봄.
  Nifi ConsumKafka_2_6 에 첫번째 Broker 주소를 넣었을 때, 두번째를 넣었을 때, 세번째를 넣었을 때
  모두 읽는 노드는 다르지만, 일단 읽기만 하면 모든 데이터를 다 읽음
  예를 들어
  첫번째 주소를 사용하면 노드 3이 모두 읽고, 노드 1, 2 는 읽지 않음
  두번째 주소를 사용하면 노드 2가 모두 읽고, 노드 1, 3 은 읽지 않음
  세번째 주소를 사용하면 노드 2가 모두 읽고, 노드 1, 3 은 읽지 않음
  읽는 노드도 랜덤하게 정해지는 듯.

결론적으로, Nifi 에서 데이터를 읽는 노드는 Partition 개수에 따라 정해지고, 읽는 노드는 랜덤으로 정해짐 
Partition 이 1면 3대의 Nifi 노드 중 랜덤하게 정해진 한 대에서 모든 데이터를 읽음.
Partition 이 2면 3대의 Nifi 노드 중 랜덤하게 정해진 두 대에서 데이터를 랜덤하게 읽음.
Partition 이 3이면 3대의 Nifi 노드 중 랜덤하게 정해진 세 대에서 데이터를 랜덤하게 읽음.

Kafka 는 partition 에 데이터를 round robin 으로 넣지만,
Nifi 는 그것과 상관 없이 랜덤으로 읽는 것 같음.

Nifi 노드는 3대인데 Partition 이 4 이상이라면?
Partition 이 4 이상이여도 3일때와 마찬가지로 동작함.

 

 

추가적으로, 3 대의 Broker 주소를 Nifi ConsumeKafka_2_6 옵션에 1개만 넣었을 때, 3개 모두 넣었을 때 동작의 차이는 없는 듯 하나 성능의 차이는 확인해야봐야 함.

 

 


내가 ConsumKafka_2_6 Processor 를 사용하여 Kafka 에서 데이터를 가지고 온 뒤,
실행중이던 Process 를 stop 하면 갑자기 Kafka 쪽에서 Partition 을 재분배한다(....)
Kafka describe 명령어가 'rebalance 하는 중이니까 나중에 하셈' 이라는 경고와 함께 막히기 때문에 알고 있음.
새 Consumer 가 추가되거나, 기존 Consumer 가 사라질 때 rebalance 가 된다고 하는데 나는 stop 하고나서 rebalance 되는 것을 보았음.
관련 내용은 아래 참고.

참고 community.cloudera.com/t5/Support-Questions/NiFi-Uneven-Distribution-ConsumeKafka/m-p/208445#M170402

 


데이터가 많이 쌓여있는 Kafka 에 새롭게 Consumer Processor(ConsumeKafka) 를 연결하면,
이 Consumer Processor 는 Kafka 에 쌓여있는 데이터를 모두 Consume 할까? 실제로 해보니 아님.
이 Consumer Processor 는 자신이 생성되고 난 이후에 Kafka 에 쌓인 데이터만 consume 하더라.

예를 들어, 두 개의 Consumer Processor 가 있고 A cp, B cp 라고 부르자.
A cp 의 group id 는 test1, B cp 의 group id 는 test2 라고 하자.
먼저 Kafka 에 1, 2, 3  데이터를 쌓는다.
그 다음에 A cp 를 생성한다.
Kafka 에 4, 5 데이터를 쌓으면 A cp 는 4, 5 를 Consume 한다.
그 다음에 B cp 를 생성한다.
Kafka 에 6, 7 데이터를 쌓으면 A cp 가 6, 7 를, B cp 가 6, 7 를 Consume 한다.
A cp 를 잠깐 멈추고 Kafka 에 8, 9 데이터를 쌓으면 B cp 가 8, 9 를 Consume 한다.
A cp 를 다시 실행시키면 8, 9 를 Consume 한다.

A cp 를 멈추었을 때, A cp 를 복제한 C cp 를 A cp 대신 실행시켜도 A cp 와 똑같이 동작함.
다시 말해 멈춰있는 A cp 대신 C cp 를 동작시키면 C cp 가 8, 9 를 Consume 한다는 것.
이것으로 보아, Kafka 에 group id 가 등록(?)되는 것을 기준으로 consumer processor 가 데이터를 가져오나 봄.
즉, A cp 가 처음에 Kafka 에 group id (test1) 을 등록하면, C cp 는 이미 등록된 group id (test1) 을 이용해서
8, 9 데이터를 consume 한다는 것.

 

 

 

 

참고

www.popit.kr/kafka-consumer-group/

 

 

 

 

 

 

+ Recent posts