제가 공부한 것을 적어두었습니다. 배우는 입장이기 때문에, 아래 내용이 틀린 것이 있을 수 있습니다.

 

 

 

spark streaming 을 다룰 때,

 

"window 가 10분이고 update 가 5분이다"

 

라고 말 하면 어떻게 이해해야 할까?

스파크 공식 문서의 예제를 토대로 이해해보자.

 

 

* 여기서 trigger 는 5분이다. trigger 란, 연산 결과 테이블을 출력하는 주기를 말한다.

그래서 위에 그림에서 5분마다 테이블을 출력하고 있다.

 

 

word count 를 하는 코드로 spark streaming 을 시작했다고 하자.

이 때 window 는 10분이고, update 는 5분이다.

 

window 가 10분이라는 소리는,

10분 동안 모인 데이터를 word count 한다는 의미이다.

 

이를테면 12:00 ~ 12:10 사이에 온 세 개의 데이터들 (12:02, 12:03, 12:07) 을

모두 모아서 word count 한 결과가

이것이다.

 

update 가 5분 이라는 소리는,

window 의 시작을 5분 주기로 하라는 의미이다.

5분 마다 슬라이딩 이라는 표현으로 바꿀 수 있다.

12:00 에 시작한 window 는 12:10 에 끝난다.

그 사이 12:05 에 window 를 하나 더 시작하라는 말이다.

그래서 12:05 ~ 12:15 의 window 가 하나 더 만들어진다.

 

위의 그림을 아래 표로 다시 이해해보자.

 

시간 출력 결과 설명
12:05
12:00 ~ 12:05 사이에 쌓인 데이터로 word count

 

window 가 12:00 ~ 12:10 이라서 10분 동안 데이터를 쌓아두고 있는데 trigger 에 의해 테이블을 출력해야 하는 상황이 와서, 5분 동안 쌓아둔 데이터들(12:02, 12:03)로라도 word count 를 해서 출력한다.
12:10
12:00 ~ 12:10 사이에 쌓인 데이터로 word count (window 끝)
12:05 ~ 12:10 사이에 쌓인 데이터로 word count

window 12:00 ~ 12:10 만큼 데이터가 쌓였다. 이제 이 window 는 자신의 기간(10분)만큼 데이터를 쌓았으니 더 쌓는 일은 없을 것이다. trigger 에 의해 여태껏 10분 동안 쌓은 데이터들(12:02, 12:03, 12:07)로 word count 를 하여 출력한다.

 

 

12:05 부터 시작(왜냐하면 update 가 5분이니까)하고 12:15에 끝나는 window 에서 데이터를 모으다가 12:10 에 trigger에 의해 테이블 출력해야 하는 상황이 되어서, 여태 5분 동안 쌓아둔 데이터(12:07)로라도 word count를 해서 출력한다.

12:15
12:00 ~ 12:10 사이에 쌓인 데이터로 word count (window 끝)
12:05 ~ 12:15 사이에 쌓인 데이터로 word count (window 끝)
12:10 ~ 12:15 사이에 쌓인 데이터로 word count

가장 첫 번째 output 테이블은 window가 이미 끝났으므로 더 이상 테이블을 갱신하지 않는다.

 

 

 

window 12:05 ~ 12:15 동안 쌓인 데이터들(12:07, 12:11, 12:13) 로 word count 를 한 결과 테이블이다.

 

 

 

 

12:10 부터 시작하고 12:20 에 끝나는 window 에서 데이터를 모으다가 12:15 에 trigger 에 의해 출력해야 하는 상황이 되어서, 여태 5분 동안 쌓아둔 데이터(12:11, 12:13)로라도 word count를 해서 출력한다.

 

 

위의 표 설명을 읽어본다면, 데이터가 window 만큼 쌓여서 처리되고,

5분 간격으로 update(sliding) 되는구나 라는 걸 이해할 수 있을 것이다.

 

그럼 위의 내용이 scala 코드에서는 어떻게 표현될까?

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

 

아래는 watermark 와 함께 있는 코드

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

 

자세한 코드는 공식 문서 참고

 

 

 

 

 

이해가 되지 않거나 내용 중에 틀린 것이 있다면 댓글과 태클 걸어주세요.

감사합니다.

+ Recent posts