spark structured streaming 운용시 알아야 할 명령어들을 적어둔다.
독학하면서 적는 것이기 때문에 틀린 내용일 수 있다.
아래 코드에서 사용하는 데이터는 여기서 받을 수 있다.
- structured streaming 에서 파일 읽고 쓰는 방법 보여주는 아주 좋은 곳 링크
- spark streaming 데모영상 링크
-특정 위치 파일 읽는 법
hdfs 에서 읽을 땐
s3 는 모르겠지만, local 이나 hdfs 는 파일을 읽는 게 아니고, dir 내에 있는 파일"들"을 읽어온다. 따라서 파일이 아니라 위에 처럼 path 를 지정해줘야 한다. 하나의 파일이 추가되게 되면, 추가된 파일이 스트림에 흘러가 spark로 처리 될 것이다. 자세한 것은 이곳 참고 |
- streaming 은 뒷단의 Thread 위에서 동작하게 되는데, 이 때 streaming의 쿼리나 연산이 끝나지 않았음에도 Thread가 종료되는 상황을 막기 위해 awaitTermination() 명령어를 start() 이후에 붙여준다. 붙여줘야 드라이버 프로세스가 종료되는 상황을 막는다.
- SparkSession 을 통해 spark.streams.active 명령어를 실행하면, 현재 몇 개의 streaming 작업이 돌아가고 있는지 Array 로 확인이 가능하다.
- Schema 는 알아서 추리할 수 있도록 할 수 있지만, 되도록 직접 넣어주는 것이 좋다(지원을 안 한다거나 하는 이유였는데 지금 구체적으로 잊어버림). 하지만 직접 넣기에는 무리가 있으니, 아래처럼 Schema 를 가져오는 방법을 사용한다.
val schema = spark.read.format("json").load("/streaming_data/mydata").schema //read 로 읽은 파일의 schema를 가져온다. val df = spark.readStream.format("json").schema(schema).load("/streaming_data") //readStream 옵션에 넣어준다.
|
- trigger 는 아래처럼 넣을 수 있다.
import org.apache.spark.sql.streaming.Trigger
spark.conf.set("spark.sql.shuffle.partitions", 5) val path = "/data" val schema = spark.read.format("json").load(path+"/mydata").schema val streaming = spark.readStream.format("json").schema(schema).load(path)
//10초 마다 한 번씩 trigger val group = streaming.groupBy("gt").count() group.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) .format("console").outputMode("complete").start().awaitTermination()
//딱 한 번만 trigger val group = streaming.groupBy("gt").count() group.writeStream.trigger(Trigger.Once()) .format("console").outputMode("complete").start().awaitTermination() |
- event time 기준으로 스트리밍을 하려면 아래 코드처럼 하면 된다.
윈도우가 "event_time" column을 기준으로 10분이다.
import org.apache.spark.sql.functions.{window, col} spark.conf.set("spark.sql.shuffle.partitions", 5) val path = "/data" val schema = spark.read.format("json").load(path+"/mydata").schema val streaming = spark.readStream.format("json").schema(schema).load(path)
//아래 처럼 cast 연산을 통해 event time 을 "2015-02-23 10:21:55.327" 같은 형식으로 바꿔준다. val withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")
//예제 하나 withEventTime.groupBy(window(col("event_time"), "10 minutes")).count().writeStream.queryName("events_per_window").format("console").outputMode("complete").start().awaitTermination()
//예제 둘. withEventTime.groupBy(window(col("event_time"), "10 minutes"), col("User")).count().writeStream.queryName("events_per_window").format("memory").outputMode("complete").start().awaitTermination()
|
위와 같이 형식을 바꿔주는 이유는 스파크가 이게 시간이란 걸 알아듣게 하기 위해서임.
이 시간이 있는 테이블(withEventTime)을 기준으로 window 를 세팅하고 스트리밍 작업을 하면 됨.
만약 새로 추가된 데이터에서 (이벤트 시간 기준으로)늦게 온 데이터가 있다면
watermark 범위 내에 있을 때 스파크가 알아서 이벤트타임 순으로 처리 할 것이다.
watermark, window, trigger 을 이해하기 위하여 예제를 들어 자세한 설명한 페이지 참고
wateramrk 사용하는 방법은 아래 있음.
두번째 예제는 책에 있길래 적긴 했는데 무슨 의미인지 모르겠다.
직접 해봐도 실행도 안 되고..-_-;
- 바로 위에 있는 마지막 코드처럼 format 이 memory 인 경우, memory 에 생긴 결과를 보는 방법은 아래와 같다.
queryName 이 events_per_window 이기 때문에,
select * from events_per_window
혹은
spark.sql("select * from events_per_window").show()
- 워터마크 박는 방법
import org.apache.spark.sql.functions.{window, col} spark.conf.set("spark.sql.shuffle.partitions", 5) val path = "/data" val schema = spark.read.format("json").load(path+"/mydata").schema val streaming = spark.readStream.format("json").schema(schema).load(path) val withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time") withEventTime.withWatermark("event_time", "5 hours").groupBy(window(col("event_time"), "10 minutes")) .count().writeStream.queryName("events_per_window").format("console").outputMode("complete").start().awaitTermination() |
늦게 도착한 데이터를 다룰 때, 5시간 늦은 데이터까지만 수용하겠다는 의미의 워터마크.
- 스트림으로 날아오는 데이터 중 중복된 데이터들을 제거하고 하나만 남기는 방법
import org.apache.spark.sql.functions.{window, col} spark.conf.set("spark.sql.shuffle.partitions", 5) val path = "/data" val schema = spark.read.format("json").load(path+"/mydata").schema val streaming = spark.readStream.format("json").schema(schema).load(path) val query = streaming.select("Creation_Time").filter("Creation_Time%2 ==0") withEventTime.withWatermark("event_time", "5 seconds").dropDuplicates("User, "event_time").groupBy("User") .count().writeStream.queryName("duplicated").format("console").outputMode("complete").start().awaitTermination() |
- 스트림에 쿼리를 사용하는 방법
Creation_Time 이 짝수인 것만 뽑는 쿼리를 적용해보았다.
import org.apache.spark.sql.functions.{window, col} spark.conf.set("spark.sql.shuffle.partitions", 5) val path = "/data" val schema = spark.read.format("json").load(path+"/mydata").schema val streaming = spark.readStream.format("json").schema(schema).load(path)
// aggregation 없는 쿼리 val query = streaming.select("Creation_Time").filter("Creation_Time%2 ==0") query.writeStream.format("console").start()
// aggregation 하는 쿼리 val query = streaming.groupBy("gt").count() query.writeStream.format("console").start() |
- 특정 포맷 파일 읽는 법
json 의 경우 val data = spark.read.format("json").load("/streaming_data_json")
csv 의 경우 val data = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") //schema 를 구체적으로 넣지 않고 이렇게 추측하게 하여 사용 가능 .load("/streaming_data_csv")
spark streaming 이 지원하는 파일은 text, csv, json, orc, parquet 이라고 함. 자세한 것은 spark structured streaming 공식 문서 참고 |
- 실시간 group by 예제
json 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다. spark.conf.set("spark.sql.shuffle.partitions", 5) val sch = spark.read.format("json").load("/streaming/activity_data.json").schema val streaming = spark.readStream.schema(sch).option("maxFilesPerTrigger", 1).json("/streaming") val activityCounts = streaming.groupBy("gt").count() val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete") activityQuery.start().awaitTermination |
- 실시간 선택과 필터링 예제
json 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다. import org.apache.spark.sql.functions.expr
spark.conf.set("spark.sql.shuffle.partitions", 5) val sch = spark.read.format("json").load("/streaming/activity_data.json").schema val streaming = spark.readStream.schema(sch).option("maxFilesPerTrigger", 1).json("/streaming") val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'")).where("stairs").where("gt is not null").select("gt", "model","arrival_time","creation_time").writeStream.queryName("simple_transform").format("console").outputMode("append").start().awaitTermination() |
- 실시간 Text File 읽고 WordCount
text 데이터가 hdfs 상의 /streaming/ 에 위치하고 있다. spark.conf.set("spark.sql.shuffle.partitions", 5) val str = spark.readStream.textFile("/streaming") val words = str.as[String].flatMap(_.split(" ")) val wordcounts = words.groupBy("value").count() val query = wordcounts.writeStream.outputMode("complete").format("console").start().awaitTermination() |
complete mode 로 하기 때문에 spark streaming 으로 들어가는 모든 데이터들이 기억되어지고 갱신된다.
아래는 실제 console 화면
Trigger 는 10secs 로 주고, maxFilesPerTrigger 를 1, 2 로 나눠서 테스트해본 결과
import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", 1) val str = spark.readStream.option("maxFilesPerTrigger", [1 혹은 2] ).textFile("/streaming") val words = str.as[String].flatMap(_.split(" ")) val wordcounts = words.groupBy("value").count() val query = wordcounts.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).outputMode("complete").format("console").start().awaitTermination() |
< maxFilesPerTrigger 값이 1일 때 > |
< maxFilesPerTrigger 값이 2일 때>
|
'Spark' 카테고리의 다른 글
[Spark] Apache Spark on Kubernetes 기조 연설 (0) | 2019.09.10 |
---|---|
[Spark] Spark WebUI DAG 이해하기 (0) | 2019.09.09 |
[Spark Streaming] java.lang.IllegalArgumentException: Option 'basePath' must be a directory 에러 (0) | 2019.09.03 |
[Spark Streaming] Structured Streaming 공부에 도움 되는 사이트 (0) | 2019.09.03 |
[Spark] Docker 로 Spark 클러스터 구성하는 방법 (2) | 2019.08.30 |