spark structured streaming 운용시 알아야 할 명령어들을 적어둔다.

독학하면서 적는 것이기 때문에 틀린 내용일 수 있다.

아래 코드에서 사용하는 데이터는 여기서 받을 수 있다. 

 

 

- structured streaming 에서 파일 읽고 쓰는 방법 보여주는 아주 좋은 곳 링크

- spark streaming 데모영상 링크

 

 

-특정 위치 파일 읽는 법

hdfs 에서 읽을 땐
"hdfs:///directory/"

local 에서 읽을 땐
"file:///directory/"
*참고) local file 을 읽기 위해선 spark 실행 시 --master local 옵션을 줘야 한다.

s3 에서 읽을 땐
"s3://directory/data.txt"

 

s3 는 모르겠지만, local 이나 hdfs 는 파일을 읽는 게 아니고, dir 내에 있는 파일"들"을 읽어온다.

따라서 파일이 아니라 위에 처럼 path 를 지정해줘야 한다.

하나의 파일이 추가되게 되면, 추가된 파일이 스트림에 흘러가 spark로 처리 될 것이다.

자세한 것은 이곳 참고

 

 

 

 

- streaming 은 뒷단의 Thread 위에서 동작하게 되는데, 이 때 streaming의 쿼리나 연산이 끝나지 않았음에도 Thread가 종료되는 상황을 막기 위해 awaitTermination() 명령어를 start() 이후에 붙여준다. 붙여줘야 드라이버 프로세스가 종료되는 상황을 막는다.

 

 

 

 

- SparkSession 을 통해 spark.streams.active 명령어를 실행하면, 현재 몇 개의 streaming 작업이 돌아가고 있는지 Array 로 확인이 가능하다. 

 

 

- Schema 는 알아서 추리할 수 있도록 할 수 있지만, 되도록 직접 넣어주는 것이 좋다(지원을 안 한다거나 하는 이유였는데 지금 구체적으로 잊어버림). 하지만 직접 넣기에는 무리가 있으니, 아래처럼 Schema 를 가져오는 방법을 사용한다.

val schema = spark.read.format("json").load("/streaming_data/mydata").schema //read 로 읽은 파일의 schema를 가져온다.

val df = spark.readStream.format("json").schema(schema).load("/streaming_data") //readStream 옵션에 넣어준다.

 

 

 

- trigger 는 아래처럼 넣을 수 있다.

import org.apache.spark.sql.streaming.Trigger

 

spark.conf.set("spark.sql.shuffle.partitions", 5)

val path = "/data"

val schema = spark.read.format("json").load(path+"/mydata").schema

val streaming = spark.readStream.format("json").schema(schema).load(path)

 

//10초 마다 한 번씩 trigger

val group = streaming.groupBy("gt").count()

group.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))

.format("console").outputMode("complete").start().awaitTermination()

 

//딱 한 번만 trigger

val group = streaming.groupBy("gt").count()

group.writeStream.trigger(Trigger.Once())

.format("console").outputMode("complete").start().awaitTermination()

 

 

 

 

- event time 기준으로 스트리밍을 하려면 아래 코드처럼 하면 된다.

윈도우가 "event_time" column을 기준으로 10분이다.

 

import org.apache.spark.sql.functions.{window, col}

spark.conf.set("spark.sql.shuffle.partitions", 5)

val path = "/data"

val schema = spark.read.format("json").load(path+"/mydata").schema

val streaming = spark.readStream.format("json").schema(schema).load(path)

 

//아래 처럼 cast 연산을 통해 event time 을 "2015-02-23 10:21:55.327" 같은 형식으로 바꿔준다.

val withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

 

//예제 하나

withEventTime.groupBy(window(col("event_time"), "10 minutes")).count().writeStream.queryName("events_per_window").format("console").outputMode("complete").start().awaitTermination()

 

 

//예제 둘. 

withEventTime.groupBy(window(col("event_time"), "10 minutes"), col("User")).count().writeStream.queryName("events_per_window").format("memory").outputMode("complete").start().awaitTermination()

 

 

위와 같이 형식을 바꿔주는 이유는 스파크가 이게 시간이란 걸 알아듣게 하기 위해서임.

이 시간이 있는 테이블(withEventTime)을 기준으로 window 를 세팅하고 스트리밍 작업을 하면 됨.

만약 새로 추가된 데이터에서 (이벤트 시간 기준으로)늦게 온 데이터가 있다면

watermark 범위 내에 있을 때 스파크가 알아서 이벤트타임 순으로 처리 할 것이다.

watermark, window, trigger 을 이해하기 위하여 예제를 들어 자세한 설명한 페이지 참고

wateramrk 사용하는 방법은 아래 있음.

 

두번째 예제는 책에 있길래 적긴 했는데 무슨 의미인지 모르겠다.

직접 해봐도 실행도 안 되고..-_-;

 

 

 

 

 

 

- 바로 위에 있는 마지막 코드처럼 format 이 memory 인 경우, memory 에 생긴 결과를 보는 방법은 아래와 같다.

queryName 이 events_per_window 이기 때문에,

 

select * from events_per_window

혹은

spark.sql("select * from events_per_window").show()

 

 

 

 

 

- 워터마크 박는 방법

import org.apache.spark.sql.functions.{window, col}

spark.conf.set("spark.sql.shuffle.partitions", 5)

val path = "/data"

val schema = spark.read.format("json").load(path+"/mydata").schema

val streaming = spark.readStream.format("json").schema(schema).load(path)

val withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

withEventTime.withWatermark("event_time", "5 hours").groupBy(window(col("event_time"), "10 minutes"))

.count().writeStream.queryName("events_per_window").format("console").outputMode("complete").start().awaitTermination()

늦게 도착한 데이터를 다룰 때, 5시간 늦은 데이터까지만 수용하겠다는 의미의 워터마크.

 

 

 

 

 

- 스트림으로 날아오는 데이터 중 중복된 데이터들을 제거하고 하나만 남기는 방법 

import org.apache.spark.sql.functions.{window, col}

spark.conf.set("spark.sql.shuffle.partitions", 5)

val path = "/data"

val schema = spark.read.format("json").load(path+"/mydata").schema

val streaming = spark.readStream.format("json").schema(schema).load(path)

val query = streaming.select("Creation_Time").filter("Creation_Time%2 ==0") 

withEventTime.withWatermark("event_time", "5 seconds").dropDuplicates("User, "event_time").groupBy("User")

.count().writeStream.queryName("duplicated").format("console").outputMode("complete").start().awaitTermination()

 

 

 

 

 

- 스트림에 쿼리를 사용하는 방법

Creation_Time 이 짝수인 것만 뽑는 쿼리를 적용해보았다.

import org.apache.spark.sql.functions.{window, col}

spark.conf.set("spark.sql.shuffle.partitions", 5)

val path = "/data"

val schema = spark.read.format("json").load(path+"/mydata").schema

val streaming = spark.readStream.format("json").schema(schema).load(path)

 

// aggregation 없는 쿼리

val query = streaming.select("Creation_Time").filter("Creation_Time%2 ==0")

query.writeStream.format("console").start()

 

// aggregation 하는 쿼리

val query = streaming.groupBy("gt").count()

query.writeStream.format("console").start()

 

 

- 특정 포맷 파일 읽는 법

json 경우

val data = spark.read.format("json").load("/streaming_data_json")

 

csv 경우

val data = spark.read.format("csv")

.option("header", "true")

.option("inferSchema", "true") //schema 를 구체적으로 넣지 않고 이렇게 추측하게 하여 사용 가능

.load("/streaming_data_csv")

 

spark streaming 이 지원하는 파일은 text, csv, json, orc, parquet 이라고 함.

자세한 것은 spark structured streaming 공식 문서 참고

 

 

 

- 실시간 group by 예제

json 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다.

spark.conf.set("spark.sql.shuffle.partitions", 5)

val sch = spark.read.format("json").load("/streaming/activity_data.json").schema

val streaming = spark.readStream.schema(sch).option("maxFilesPerTrigger", 1).json("/streaming")

val activityCounts = streaming.groupBy("gt").count()

val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete")

activityQuery.start().awaitTermination

 

 

- 실시간 선택과 필터링 예제

json 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다.
구체적인 명령은 아래와 같다. 먼저 column 을 하나 만드는데 이름은 "stairs" 이고 gt 가 %stair% 이면 true, 아니면 false 값을 갖게 된다.
이 "stairs" column 이 true 인 것과 gt가 null 이 아닌 column 을 뽑도록 where 조건을 주었다.
또한 출력을 위한 column 들은 gt,model,arrival_time,creation_time Column 들만 선택하였다.

import org.apache.spark.sql.functions.expr

 

spark.conf.set("spark.sql.shuffle.partitions", 5)

val sch = spark.read.format("json").load("/streaming/activity_data.json").schema

val streaming = spark.readStream.schema(sch).option("maxFilesPerTrigger", 1).json("/streaming")

val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'")).where("stairs").where("gt is not null").select("gt", "model","arrival_time","creation_time").writeStream.queryName("simple_transform").format("console").outputMode("append").start().awaitTermination()

 

 

 

 

- 실시간 Text File 읽고 WordCount

text 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다.

spark.conf.set("spark.sql.shuffle.partitions", 5)

val str = spark.readStream.textFile("/streaming")

val words = str.as[String].flatMap(_.split(" "))

val wordcounts = words.groupBy("value").count()

val query = wordcounts.writeStream.outputMode("complete").format("console").start().awaitTermination()

complete mode 로 하기 때문에 spark streaming 으로 들어가는 모든 데이터들이 기억되어지고 갱신된다.

아래는 실제 console 화면

 

Trigger 는 10secs 로 주고, maxFilesPerTrigger 를 1, 2 로 나눠서 테스트해본 결과

import org.apache.spark.sql.streaming.Trigger

spark.conf.set("spark.sql.shuffle.partitions", 1)

val str = spark.readStream.option("maxFilesPerTrigger", [1 혹은 2] ).textFile("/streaming")

val words = str.as[String].flatMap(_.split(" "))

val wordcounts = words.groupBy("value").count()

val query = wordcounts.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).outputMode("complete").format("console").start().awaitTermination()

< maxFilesPerTrigger 값이 1일 때 >

< maxFilesPerTrigger 값이 2일 때>

 

 

 

 

+ Recent posts