제가 공부한 것을 적어두었습니다. 배우는 입장이기 때문에, 아래 내용이 틀린 것이 있을 수 있습니다.
spark streaming 을 다룰 때,
"window 가 10분이고 update 가 5분이다"
라고 말 하면 어떻게 이해해야 할까?
스파크 공식 문서의 예제를 토대로 이해해보자.
* 여기서 trigger 는 5분이다. trigger 란, 연산 결과 테이블을 출력하는 주기를 말한다.
그래서 위에 그림에서 5분마다 테이블을 출력하고 있다.
word count 를 하는 코드로 spark streaming 을 시작했다고 하자.
이 때 window 는 10분이고, update 는 5분이다.
window 가 10분이라는 소리는,
10분 동안 모인 데이터를 word count 한다는 의미이다.
이를테면 12:00 ~ 12:10 사이에 온 세 개의 데이터들 (12:02, 12:03, 12:07) 을
모두 모아서 word count 한 결과가
이것이다.
update 가 5분 이라는 소리는,
window 의 시작을 5분 주기로 하라는 의미이다.
5분 마다 슬라이딩 이라는 표현으로 바꿀 수 있다.
12:00 에 시작한 window 는 12:10 에 끝난다.
그 사이 12:05 에 window 를 하나 더 시작하라는 말이다.
그래서 12:05 ~ 12:15 의 window 가 하나 더 만들어진다.
위의 그림을 아래 표로 다시 이해해보자.
시간 | 출력 결과 | 설명 |
12:05 |
|
window 가 12:00 ~ 12:10 이라서 10분 동안 데이터를 쌓아두고 있는데 trigger 에 의해 테이블을 출력해야 하는 상황이 와서, 5분 동안 쌓아둔 데이터들(12:02, 12:03)로라도 word count 를 해서 출력한다. |
12:10 |
window 12:00 ~ 12:10 만큼 데이터가 쌓였다. 이제 이 window 는 자신의 기간(10분)만큼 데이터를 쌓았으니 더 쌓는 일은 없을 것이다. trigger 에 의해 여태껏 10분 동안 쌓은 데이터들(12:02, 12:03, 12:07)로 word count 를 하여 출력한다.
12:05 부터 시작(왜냐하면 update 가 5분이니까)하고 12:15에 끝나는 window 에서 데이터를 모으다가 12:10 에 trigger에 의해 테이블 출력해야 하는 상황이 되어서, 여태 5분 동안 쌓아둔 데이터(12:07)로라도 word count를 해서 출력한다. |
|
12:15 |
가장 첫 번째 output 테이블은 window가 이미 끝났으므로 더 이상 테이블을 갱신하지 않는다.
window 12:05 ~ 12:15 동안 쌓인 데이터들(12:07, 12:11, 12:13) 로 word count 를 한 결과 테이블이다.
12:10 부터 시작하고 12:20 에 끝나는 window 에서 데이터를 모으다가 12:15 에 trigger 에 의해 출력해야 하는 상황이 되어서, 여태 5분 동안 쌓아둔 데이터(12:11, 12:13)로라도 word count를 해서 출력한다. |
위의 표 설명을 읽어본다면, 데이터가 window 만큼 쌓여서 처리되고,
5분 간격으로 update(sliding) 되는구나 라는 걸 이해할 수 있을 것이다.
그럼 위의 내용이 scala 코드에서는 어떻게 표현될까?
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
아래는 watermark 와 함께 있는 코드
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
자세한 코드는 공식 문서 참고
이해가 되지 않거나 내용 중에 틀린 것이 있다면 댓글과 태클 걸어주세요.
감사합니다.
'Spark' 카테고리의 다른 글
[Spark] Docker 로 Spark 클러스터 구성하는 방법 (2) | 2019.08.30 |
---|---|
[Spark] 스칼라 DataFrame 다양한 연산 모음 (1) | 2019.08.28 |
[Spark] YARN 위에서 Application 을 실행하는 단계 (0) | 2019.07.25 |
[Spark] value toDF is not a member of org.apache.spark.rdd.RDD 에러 (0) | 2019.07.19 |
[Spark] MongoDB 와 연동시 Aggregate Query하는 방법 (0) | 2019.07.13 |