이전 포스트에서는, Spark 에서 기본적으로 제공해주는 PageRank 코드(scala rdd 를 사용하여 구현되어있음) 를 이해해 보았다.
이번에는 rdd 가 아니라 Spark sql dataframe 을 이용하여 scala PageRank 를 구현해본다.
기본 환경과 데이터는 이전 포스트를 참고한다.
import scala.collection.mutable.WrappedArray
//중복이 없는 상태의 data를 읽어들임
val lines = spark.read.textFile("/data").distinct()
//source 와 destination 나누기
val links = lines.map{ s=>
val parts = s.split("\\s+")
(parts(0), parts(1))
}
//첫번째 parts( "_1" ) 로 groupBy 하기.
val group = links.groupBy("_1").agg(collect_list(col("_2")).as("to"))
//rank 만들기
val rank = group.select("_1").withColumn("rank", lit(1.0))
//join and cvalue
val cvalue = group.join(rank, "_1").drop("_1")
//contribs
val contribs = cvalue.flatMap(row => {
val array = row.get(0).asInstanceOf[WrappedArray[String]].toSeq
val size = array.size
array.map(x => (x, row.get(1).asInstanceOf[Double]/size))
})
//rank 업데이트
val sum = contribs.groupBy("_1").sum("_2").as("sum")
val new_ranks = sum.map(row => (row.getString(0), row.getDouble(1)*0.85+0.15))
//확인
new_ranks.show
최종 결과물인 new_ranks 를 hdfs 에 쓰기 위해서 아래의 명령어를 가장 마지막에 실행한다.
new_ranks.write.format("csv").save("/out") |
이 명령어를 실행하면, hdfs 의 /out 디렉토리에 결과값들을 넣는다.
Spark의 최대 장점인 메모리 사용을 위해, cache() 를 붙여야 하지만 일단 코드가 돌아가는지 알아보기 위해 없이 만들었다.
rank 계산을 위한 반복이 없는 코드라서 cache() 가 필요하진 않지만,
original code 처럼 반복문을 넣게 되면 반드시 cache() 가 필요하다.
추후에 cache() 를 붙여서 코드를 업데이트 하겠다.
'Spark' 카테고리의 다른 글
[Scala] 공부하기 좋은 블로그 링크 (0) | 2019.06.17 |
---|---|
[Spark] Spark-SQL 튜토리얼/가이드 링크들(영어) (0) | 2019.06.17 |
[Spark] scala rdd 이용한 pagerank 알고리즘 이해하기 (0) | 2019.06.14 |
[Spark] RDD 에 대한 구체적인 설명 링크 (0) | 2019.05.23 |
[Spark] Ubuntu에 sbt 설치하는 법 (0) | 2019.05.21 |