이전 포스트에서는, Spark 에서 기본적으로 제공해주는 PageRank 코드(scala rdd 를 사용하여 구현되어있음) 를 이해해 보았다.

이번에는 rdd 가 아니라 Spark sql dataframe 을 이용하여 scala PageRank 를 구현해본다.

기본 환경과 데이터는 이전 포스트를 참고한다.



import scala.collection.mutable.WrappedArray


//중복이 없는 상태의 data를 읽어들임
val lines = spark.read.textFile("/data").distinct()

//source 와 destination 나누기
val links = lines.map{ s=>
     val parts = s.split("\\s+")
     (parts(0), parts(1))
     }

//첫번째 parts( "_1" ) 로 groupBy 하기.
val group = links.groupBy("_1").agg(collect_list(col("_2")).as("to"))

//rank 만들기
val rank = group.select("_1").withColumn("rank", lit(1.0))

//join and cvalue
val cvalue = group.join(rank, "_1").drop("_1")

//contribs
val contribs = cvalue.flatMap(row => {
      val array = row.get(0).asInstanceOf[WrappedArray[String]].toSeq
      val size = array.size
      array.map(x => (x, row.get(1).asInstanceOf[Double]/size))
      })

//rank 업데이트
val sum = contribs.groupBy("_1").sum("_2").as("sum")
val new_ranks = sum.map(row => (row.getString(0), row.getDouble(1)*0.85+0.15))

//확인
new_ranks.show

 

최종 결과물인 new_ranks 를 hdfs 에 쓰기 위해서 아래의 명령어를 가장 마지막에 실행한다.

 


new_ranks.write.format("csv").save("/out")

 

이 명령어를 실행하면, hdfs 의 /out 디렉토리에 결과값들을 넣는다.

 

 

Spark의 최대 장점인 메모리 사용을 위해, cache() 를 붙여야 하지만 일단 코드가 돌아가는지 알아보기 위해 없이 만들었다.

rank 계산을 위한 반복이 없는 코드라서 cache() 가 필요하진 않지만,

original code 처럼 반복문을 넣게 되면 반드시 cache() 가 필요하다.

추후에 cache() 를 붙여서 코드를 업데이트 하겠다.

+ Recent posts