spark streaming 을 이용하여 데이터 파이프라인을 구축해본다.
두 개의 spark streaming 프로그램을 사용한다.
첫번째 saprk streaming 에서는, kafka topic1 로 들어오는 단어들을 읽고 white space를 기준으로 분리하여 kafka topic2 에 저장한다.
두번째 spark streaming 에서는, kafka topic2 로 들어오는 단어들을 읽고 word count를 하여 console 에 출력한다.
1. kafka 와 연동 가능한 환경을 구축해둔다.
기본적인 spark 와 kafka 연동 방법은 여기 링크를 참고한다.
위의 링크에 나온 대로 환경이 구축되어있다는 전제 하에 아래 내용을 이어간다.
2. terminal 하나를 열고, 아래 명령어를 참고하여 kafka topic1 에 실시간으로 문장을 입력할 수 있도록 한다.
< topic1 만들기 > ./kafka/bin/kafka-topics.sh --zookeeper localhost:2181/localhost --replication-factor 1 --partitions 1 --topic topic1 --create |
< topic1 에 입력 > ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 > (여기 입력 후 enter) .... |
명령어들을 그대로 사용하면 반드시 에러가 나니, 자신의 환경에 맞도록 적절하게 zookeeper와 kafka의 ip 와 port 등을 바꾼다.
3. 아래 명령어를 이용하여 첫번째 spark shell 을 실행한다.
spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1
|
이 첫번째 spark-shell 에서는 kafka topic1 로 들어오는 단어들을 읽고
white space를 기준으로 분리하여 kafka topic2 에 저장 할 것이다.
아래 코드를 실행시킨다.
import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", 5) val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "topic1").option("startingOffsets", "earliest").load() import org.apache.spark.sql.functions.udf val toStr = udf((payload: Array[Byte]) => new String(payload)) val parsing = kd.withColumn("value", toStr(kd("value"))).select("value").as[String].flatMap(_.split(" ")).withColumn("key",lit("null")) parsing.selectExpr("key","value").writeStream.trigger(Trigger.ProcessingTime("5 seconds")).format("kafka").option("kafka.bootstrap.servers","11.11.11.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic","topic2").start().awaitTermination()
|
4. 아래 명령어를 이용하여 첫번째 spark shell 을 실행한다.
spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1
|
이 두번째 spark-shell 에서는 kafka topic2 로 들어오는 단어들을 읽고
word count를 하여 console 에 출력 할 것이다.
아래 코드를 실행시킨다.
import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", 5) val kd = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "11.11.11.8:9092").option("subscribe", "topic2").option("startingOffsets", "earliest").load() import org.apache.spark.sql.functions.udf val toStr = udf((payload: Array[Byte]) => new String(payload)) val wc = kd.withColumn("value", toStr(kd("value"))).groupBy("value").count() wc.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).outputMode("complete").format("console").start().awaitTermination()
|
실제로 돌아가는 모습을 아래 영상으로 확인 가능.
왼쪽 위의 터미널에서 값을 입력하면
오른쪽 위의 터미널에서 값들을 공백 기준으로 분리한 뒤
오른쪽 아래 터미널에서 분리된 값을 받아 wordcount 한다.
참고
https://www.youtube.com/watch?v=nBHfOU4YieY
'Spark' 카테고리의 다른 글
[Spark] Dataframe 만드는 여러가지 방법 링크 (0) | 2020.03.26 |
---|---|
[Spark] kafka 와 연동하는 방법 (10) | 2020.03.23 |
[Spark Streaming] HDFS 를 이용하여 Data pipeline 만들어보기 (0) | 2020.03.20 |
[Spark] 기술 질문 대비 적어두는 것들 (0) | 2020.03.07 |
[Spark] SparkSession 이란? (0) | 2020.02.21 |