아래서 설명하고 있는 kafka 는 standalone 모드로 돌아감을 밝힙니다.

이 포스트의 주제는 "spark 와 kafka 의 연동" 이기 때문에

kafka 는 standalone 모드로 간단하게 구동시켰습니다.

 

또한 이 포스트에 이론적인 내용은 없습니다.

 

 

 

 

 

 

일단 내가 테스트한 환경 이러하다.

ubuntu 가 돌아가는 세 대의 각기 다른 서버를 사용하여 spark cluster를 구축하였고

spark 의 master node 가 동작하는 서버위에 zookeeper 와 kafka 를 standalone 모드로 실행시켰다.

즉 아래와 같은 상황임.

 

서버 이름 server1 (11.11.11.8) server2 (11.11.11.9) server3 (11.11.11.10)
동작하는 것들

spark master

zookeeper (standalone)

kafka (standalone)

spark worker1 spark worker2

 

spark 는 yarn 을 이용하여 자원을 조율한다.

 

 

 

 

자 spark 를 kafka 와 연동하기 위해 아래 절차를 따른다.

spark 를 이용하여 kafka에서 읽고 쓰기 위한 코드는 여기를 적극 참고한다. 자세히 나와있다.

 

 

 

 

 

1. zookeeper를 설치한다.

자세한 zookeeper 설치 방법 링크

 

zookeeper를 standalone 모드로 설치하기 위해, 나의 zoo.cfg 는 아래처럼 작성하였다.

 

< zoo.cfg >

 

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/root/apache-zookeeper-3.5.6-bin/zoo_data

clientPort=2181

 

 

보면 server.1=xxx... 부분이 없다.

이 부분이 없으면 standalone 모드로 돌아가게 된다.

 

 

 

2. kafka를 설치한다.

자세한 kafka 설치 방법 링크

 

kafka 역시 standalone 모드로 설치하기 위해, server.properties 를 아래처럼 작성하였다.

 

< server.properties >

 

broker.id=1

...(중략)...

log.dirs=/root/kafka_2.11-2.4.1/data

...(중략)...

zookeeper.connect=localhost:2181/localhost

 

 

정말 localhost 에서 하나만 돌아가도록 만든 것임! ㅎㅎ;;

 

zookeeper 와 kafka 를 실행시키고 아래 명령어를 통해 port 가 잘 열려있는지, 잘 실행 중인지 확인한다.


Zookeeper 실행
bin/zkServer.sh start

Kafka 실행
bin/kafka-server-start.sh ../config/server.properties

 

주키퍼 포트확인
netstat -ntlp | grep 2181

카프카 포트확인
netstat -ntlp | grep 9092

 

 

 

 

 

 

3. kafka의 topic 을 만들어둔다.

아래 명령어를 토대로 tp1 이라는 이름의 topic 을 만든다.

 

 

./kafka/bin/kafka-topics.sh --zookeeper localhost:2181/localhost --replication-factor 1 --partitions 1 --topic tp1 --create

 

 

 

 

 

 

 

4. 터미널 두 개를 열어두고, 하나는 kafka에 메세지를 전달하는 용도로, 하나는 spark driver를 실행하는 용도로 사용한다.

spark 는 shell 이던 submit 던 상관 없으며, batch 나 streaming 방식으로 kafka 와 연동 가능하다.

 

첫번째 터미널에서는 아래 명령어를 이용하여 kafka tp1 토픽에 메세지(log)를 입력한다.

 

./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tp1

 

> (여기 입력 후 enter)

....

 

 

두번째 터미널에서는 (일단) spark-shell 을 열어서 kafka tp1 토픽의 메세지를 batch로 읽어본다.

아래 명령어처럼 --packages 옵션에 spark kafka lib를 넣어줘야 한다.

 

spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1

 

 

shell 이 실행되면 아래 코드를 넣는다.

이 때 kafka 를 구별하기 위해 넣는 kafka.bootstrap.servers 에는 kafka 가 실행되는 서버의 호스트 이름을 적는다. 나의 경우 11.11.11.8

subscribe 에는 읽고싶은 topic name을 넣는다.

 

< batch 코드 >

 

val kd = spark.read.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "tp1").option("startingOffsets","earliest").load()

kd.show

 

 

아래는 kafka tp1 토픽에서 log 를 읽어서 show 한 모습

batch 로 읽었기 때문에 spark action(kd.show())이 실행될 때 kafka 가 갖고 있는 log 들이 한 번에 읽힌다.

binary 로 읽힌다(...)

 

 

streaming 으로 읽고 싶다면 아래 코드를 사용한다.

 

< streaming 코드 >

 

import org.apache.spark.sql.streaming.Trigger

val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "tp1").option("startingOffsets", "earliest").load()

val stream = kd.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

 

 

streaming 코드가 실행된 모습이다. append mode 이기 때문에 이전 값들 말고 새로 입력된 값들만 console 에 출력된다.

 

 

여기서 trigger 는 옵션이므로 넣던 안 넣던 개발자 자유임.

 

 

 

 

 

 

5. binary 로 된 log 내용을 string 값으로 파싱한다.

batch 의 경우 아래와 같이 udf 를 이용하여 binary 를 String 값으로 파싱 가능하다.

 

import org.apache.spark.sql.functions.udf

val kd = spark.read.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "tp1").option("startingOffsets","earliest").load()

val toStr = udf((payload: Array[Byte]) => new String(payload))

val parsing = kd.withColumn("value", toStr(kd("value")))

parsing.show()

 

여기서 "value" 은 파싱된 값이 들어갈 새 column 의 이름.

kd("value") 는 binary 값이 들어있는 dataframe의 column 이름 (kd 객체의 "value" 필드)

결과적으로 기존의 value 값을 새로운 value 가 덮어쓰는 형태가 된다.

 

 

아래는 실행된 모습. value filed 를 보면 binary 로 표현되던 값들이 string 값으로 표현되고 있다.

 

 

streaming 코드에서도 위와 마찬가지로 udf 를 적용하면 binary 를 파싱하여 읽을 수 있다.

 

 

 

 

 

 

6. 새로운 topic 에 값을 새로 써본다.

아래 코드를 사용하면 된다. 

 

parsing.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("topic","newtopic").save()

 

 

selectExpr 에서 선택한 것은 key 와 value 라는 이름의 column 이 반드시 들어가 있어야한다.

아니면 org.apache.spark.sql.AnalysisException: Required attribute 'value' not found; 같은 error 가 뜨더라.

 

위의 코드를 실행시킨 후, kafka 에서 직접 consumer 를 실행시켜 newtopic 의 내용을 읽어볼 수 있다.

consumer 를 실행시키기 위해서 아래 코드 참고

 

./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newtopic --from-beginning

 

 

실제로 읽어본 모습은 아래와 같다.

 

코드 실행한 후

consumer 에서 방금 저장한 message 를 읽어본 모습

 

 

 

 

만약 streaming 으로 실시간 값을 kafka 에 쓰고 싶다면 아래 절차를 따른다.

먼저 hdfs 에 checkpointLocation 으로 사용할 path 를 만든다.

 

hdfs dfs -mkdir -p  /streaming/checkpointLocation

 

 

위의 streaming 코드에서 writeStream 부분의 format을 console 에서 kafka 로 바꾸고

여러 옵션들과 key, value 들을 select 하여 write 한다.

 

import org.apache.spark.sql.streaming.Trigger

val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "tp1").option("startingOffsets", "earliest").load() //여기까지 읽는 코드

import org.apache.spark.sql.functions.udf

val toStr = udf((payload: Array[Byte]) => new String(payload))

val parsing = kd.withColumn("value", toStr(kd("value")))

parsing.withColumn("key",lit("null")).selectExpr("key","value").writeStream.trigger(Trigger.ProcessingTime("5 seconds")).format("kafka").option("kafka.bootstrap.servers","11.11.11.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic","newtopic").start().awaitTermination() 

 

 

withColumn 을 이용하여, 내용은 null 이고 "key" 라는 이름의 column 을 새로 만들어주었다.

이게 없으면 "key" column 이 계속 없다고 뜨기 때문(에러 내용 : org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [value];;)

 

위의 writeStream 코드를 실행시킨 결과. 실시간으로 받은 데이터를 kafka 에 다시 쓰고 있기 때문에 console 에 아무것도 찍히지 않는다.

위의 streaming 이 실행되고 있는 도중에 아래처럼 kafka topic(tp1) 에 a, b, c 를 입력하였다.

streaming 이 a, b, c 를 받고 kafka newtopic 에 값을 그대로 쓰는 것을 아래처럼 확인하였다.

 

 

 

 

 

 

 

 

7. 만약 spark-submit 으로 코드를 실행시키고 싶다면, sbt dependencies에 아래를 추가한다.

 

< build.sbt >

 

....

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.1"

....

 

 

 

 

 

spark-streaming-kafka lib 위치 : https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10

sample code : https://mapr.com/docs/61/Spark/StructuredStreamingWordCountApplication.html

IO 코드 공식 문서 : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

간단한 IO 코드 참고 (batch, streaming) : https://docs.microsoft.com/ko-kr/azure/hdinsight/hdinsight-apache-kafka-spark-structured-streaming

kafka + spark streaming : https://alphahackerhan.tistory.com/14

11번가 kafka 컨슈머 애플리케이션 배포 전략 : https://medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72

 

 

+ Recent posts