spark streaming 을 이용하여 데이터 파이프라인을 구축해본다.

 

두 개의 spark streaming 프로그램을 사용한다.

 

첫번째 saprk streaming 에서는, kafka topic1 로 들어오는 단어들을 읽고 white space를 기준으로 분리하여 kafka topic2 에 저장한다.
두번째 spark streaming 에서는, kafka topic2 로 들어오는 단어들을 읽고 word count를 하여 console 에 출력한다.

 

 

 

1. kafka 와 연동 가능한 환경을 구축해둔다.

기본적인 spark 와 kafka 연동 방법은 여기 링크를 참고한다.

위의 링크에 나온 대로 환경이 구축되어있다는 전제 하에 아래 내용을 이어간다.

 

 

 

2. terminal 하나를 열고, 아래 명령어를 참고하여 kafka topic1 에 실시간으로 문장을 입력할 수 있도록 한다.

 

< topic1 만들기 >

./kafka/bin/kafka-topics.sh --zookeeper localhost:2181/localhost --replication-factor 1 --partitions 1 --topic topic1 --create

 < topic1 에 입력 >

./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

> (여기 입력 후 enter)

....

명령어들을 그대로 사용하면 반드시 에러가 나니, 자신의 환경에 맞도록 적절하게 zookeeper와 kafka의 ip 와 port 등을 바꾼다.

 

 

 

 

3. 아래 명령어를 이용하여 첫번째 spark shell 을 실행한다.

 

spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1

 

 

이 첫번째 spark-shell 에서는 kafka topic1 로 들어오는 단어들을 읽고

white space를 기준으로 분리하여 kafka topic2 에 저장 할 것이다.

 

아래 코드를 실행시킨다.

 

import org.apache.spark.sql.streaming.Trigger

spark.conf.set("spark.sql.shuffle.partitions", 5)

val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "topic1").option("startingOffsets", "earliest").load()

import org.apache.spark.sql.functions.udf

val toStr = udf((payload: Array[Byte]) => new String(payload))

val parsing = kd.withColumn("value", toStr(kd("value"))).select("value").as[String].flatMap(_.split(" ")).withColumn("key",lit("null"))

parsing.selectExpr("key","value").writeStream.trigger(Trigger.ProcessingTime("5 seconds")).format("kafka").option("kafka.bootstrap.servers","11.11.11.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic","topic2").start().awaitTermination()

 

 

 

 

4. 아래 명령어를 이용하여 첫번째 spark shell 을 실행한다.

 

spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1

 

 

이 두번째 spark-shell 에서는 kafka topic2 로 들어오는 단어들을 읽고

word count를 하여 console 에 출력 할 것이다.

 

아래 코드를 실행시킨다.

 

import org.apache.spark.sql.streaming.Trigger

spark.conf.set("spark.sql.shuffle.partitions", 5)

val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "topic2").option("startingOffsets", "earliest").load()

import org.apache.spark.sql.functions.udf

val toStr = udf((payload: Array[Byte]) => new String(payload))

val wc = kd.withColumn("value", toStr(kd("value"))).groupBy("value").count()

wc.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).outputMode("complete").format("console").start().awaitTermination()

 

 

 

실제로 돌아가는 모습을 아래 영상으로 확인 가능.

왼쪽 위의 터미널에서 값을 입력하면

오른쪽 위의 터미널에서 값들을 공백 기준으로 분리한 뒤

오른쪽 아래 터미널에서 분리된 값을 받아 wordcount 한다.

 

 

 

 

 

참고

https://www.youtube.com/watch?v=nBHfOU4YieY

 

 

 

 

+ Recent posts