Docker fluentd image 를 사용하여 kafka 와 연동하는 방법에 대해 설명한다.

Docker fluentd image 는 Alpine 버전과 Debian 버전이 존재하는데,

이 글에서는 Debian 버전을 사용한다.

왜냐하면 Debian 버전이 jemalloc 기능을 갖추고있기 때문

(Debian version is recommended officially since it has jemalloc support. [출처])

 

kafka 는 docker search kafka 검색시 가장 많은 stars 를 받은 "wurstmeister/kafka" image 를 사용한다.

zookeeper 도 더불어 "wurstmeister/zookeeper" image 를 사용한다.

 

http message 데이터를 fluentd 로 받아서 kafka topic (mytopic) 에 produce 하는 conf와

kafka topic (mytopic) 에서 데이터를 fluentd 로 consume 하는 conf 를 작성하여 실행해본다. 



 

하나의 서버 위에 Docker fluentd 와 Docker Kafka+zookeeper 를 설치한다. 

이 글 작성하면서 local 에 설치하였는데 그 과정을 고스란히 글에 담았다.

 

Docker Swarm 을 사용하여 Docker Container 간 네트워크 전송을 가능하게 하였다.

이 글 작성시 사용한 Docker Swarm Bridge 네트워크의 이름은 my-net 이다.

Docker Swarm network 생성 방법은 다음 링크 참고

eyeballs.tistory.com/85

 

 

아래 Docker 명령어를 이용하여, fluentd, kafka, zookeeper 를 설치한다.

fluentd conf 파일을 저장해두는 곳을 /home/fluentd/conf 라고 하자.

 

먼저 /home/fluentd/conf 에 fluentd.conf 를 생성하고 아래 내용을 넣는다.

<source>
  @type http
  port 9880
  bind 0.0.0.0
</source>

<match **>
  @type stdout
</match>

 

아무 의미 없는 conf 를 넣는 이유는 아래와 같다.

docker fluentd 가 실행되고 conf 를 해석할 때 설치되어있지 않은 다른 plugin 이 있으면

docker fluentd container 가 죽어버리기 때문이다.

 

아래 명령어를 통해 docker containers 를 실행시킨다.

 

< fluentd >


docker run -d \

-p 9880:9880 -p 9292:9292 \

--network my-net \

--name fluentd \

-v /home/fluentd/conf:/fluentd/etc \

-e FLUENTD_CONF=fluentd.conf \

fluent/fluentd:v1.6-debian-1

< zookeeper >


docker run -d \

--network my-net \

--name zookeeper \

wurstmeister/zookeeper

< kafka >


docker run -d \

--network my-net \

--name kafka \

-e KAFKA_ADVERTISED_HOST_NAME=kafka \

-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \

-v /home/kafka/run/docker.sock:/var/run/docker.sock \

wurstmeister/kafka:2.12-2.5.0

 

fluentd 의 9880 포트를 통해 http message 를 전달한다.

fluentd 의 9292 포트는 WebUI 를 위한 것이므로 생략해도 된다.

kafka 의 volume path 는 마음대로 정해도 좋다.

 

fluentd container 에 fluent-plugin-kafka plugin 을 설치한다.

 

docker exec -u root fluentd sh -c "apt update; apt install gcc make -y ; gem install fluent-plugin-kafka"

 

host os 의 /home/fluentd/conf 에 fluentd.conf 파일을 생성한다.

아래 내용을 참고하여 conf 를 작성한다.

 

< http message 를 fluentd 로 받고 kafka topic (mytopic) 에 produce 하는 conf >


# http msg -> fluentd -> kafka


# http message 를 받는다.

<source>

  @type http

  port 9880

  bind 0.0.0.0

</source>


# http.msg 태그로 match 한다.

<match http.msg>

  @type kafka2


  # list of seed brokers

  # 여기서 brokers 의 ip 가 kafka 인 이유는, Docker Swarm 때문이다.

  brokers kafka:9092

  use_event_time true


  <buffer>

    @type file

    path /fluentd/buffer

    flush_interval 3s

  </buffer>


  # topic settings

  # topic_key 는 토픽명

  # default_topic 은 topic_key 가 없을 때 사용되는 default 토픽명

  topic_key mytopic

  default_topic mytopic


  <format>

    @type json

  </format>


  # producer settings

  required_acks -1

</match>

< kafka topic (mytopic) 에서 fluentd 로 데이터를 consume 하는 conf >


# kafka -> fluentd


<source>

  @type kafka_group

  brokers kafka:9092

  topics "mytopic"

  consumer_group mygroup

  add_prefix kafka

  start_from_beginning true

</source>


# 태그는 kafka.[topic 이름]이 된다.

# 여기서는 kafka.mytopic

<match kafka.mytopic>

  @type stdout

</match>

 

새로운 터미널을 열고, 아래 명령어를 통해 fluentd 에 http message 를 보낼 수 있다.

가장 뒤에 http.msg 가 태그가 된다.

 

curl -X POST -d 'json={"json":"test", "sample":"hello world!"}' http://localhost:9880/http.msg

 

해당 토픽에 json message 가 잘 들어갔는지 확인하기 위해 

kafka container 에 들어간 후, 다음 명령어로 consume 해본다.

 

docker exec -it kafka bash

kafka-console-consumer.sh --topic mytopic --from-beginning --bootstrap-server kafka:9092

 

참고로 kafka 의 topic 을 삭제하려면 kafka container 에 들어간 후, 

server.properties 에 delete.topic.enable=true 를 추가해야 한다.

 

docker exec -it kafka bash

vi /opt/kafka_2.12-2.5.0/config/server.properties

delete.topic.enable=true 추가 후 저장

kafka-topics.sh --delete --topic mytopic --zookeeper zookeeper --bootstrap-server kafka:9092

만약 --zookeeper 나 --bootstrap-server 두 가지 옵션 중 하나만 써야 한다는 경고가 나오면,

둘 중 하나를 빼고 실행하자.

 

kafka topic 리스트를 확인하려면 아래 명령어 참고

 

kafka-topics.sh --list --bootstrap-server kafka:9092



 

참고로, fluentd 의 로그를 보려면 

docker logs -f fluentd 하여도 되지만,

docker run 옵션으로 -v ...../log:/fluentd/log 을 주어

local 에서 log 가 쌓이는 것을 볼 수 도 있다.

 

 

 

 

참고

 

Docker fluentd : https://hub.docker.com/r/fluent/fluentd/ 

docker deployment 공식 fluentd 문서 : https://docs.fluentd.org/container-deployment/install-by-docker 

kafka output plugin 공식 fluentd 문서 : https://docs.fluentd.org/output/kafka 

fluent-plugin-kafka plugin : https://github.com/fluent/fluent-plugin-kafka 

kafka 기초 명령어 : https://kafka.apache.org/quickstart 

 

 

+ Recent posts