spark streaming 을 이용하여 데이터 파이프라인을 구축해본다.
두 개의 spark streaming 프로그램을 사용한다.
첫번째 saprk streaming 에서는, netcat 으로 들어오는 단어들을 white space를 기준으로 분리하여 hdfs 에 저장한다.
두번째 spark streaming 에서는, 첫번째 spark streaming 의 결과값들을 이용하여 word count를 한다.
1.먼저 netcat 을 설치한다.
apt-get install netcat -y
|
2.아래 스크립트를 실행시키면, spark streaming 의 checkpointLocation 가 리셋되고, 기존 결과값들이 삭제된다.
또한 netcat 을 이용한 9999번 TCP 포트가 열린다.
< startPipeline.sh >
#! /bin/bash
|
3.아래 명령어로 첫번째와 두번째 spark streaming을 위한 spark shell 을 가동시킨다.
spark-shell --master yarn
|
4.첫번째 spark shell 에서 아래 코드를 실행하여 5초 동안 netcat 으로 들어오는 모든 단어들을 white space 기준으로 분리하여 hdfs 에 저장한다.
import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", 5) val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() val words = lines.as[String].flatMap(_.split(" ")) val query = words.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("text").option("path", "/streaming/out").option("checkpointLocation", "/streaming/checkpointLocation").start().awaitTermination()
|
5.두번째 spark shell 에서 아래 코드를 실행하여 10초 동안 첫번째 spark streaming의 결과값들을 모아 word count 연산을 하여 console 에 띄운다.
import org.apache.spark.sql.streaming.Trigger
|
아래는 netcat 을 통해 입력한 input 값이다.
아래는 첫번째 spark streaming 을 거쳐 두 번째 spark streaming 에서 처리된 데이터의 결과값이다.
10초 주기로 업데이트 되기 때문에, count 숫자가 비규칙적으로 업데이트 되는 걸 볼 수 있다.
참고
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
'Spark' 카테고리의 다른 글
[Spark] kafka 와 연동하는 방법 (10) | 2020.03.23 |
---|---|
[Spark Steaming] kafka 를 이용하여 Data pipeline 만들어보기 (1) | 2020.03.23 |
[Spark] 기술 질문 대비 적어두는 것들 (0) | 2020.03.07 |
[Spark] SparkSession 이란? (0) | 2020.02.21 |
[Spark] executor 개수 늘리는 방법 (0) | 2020.02.15 |