여기서 설명하는 코드는 여러 번 ranking 을 갱신하지 않습니다.

연구를 위해 rank 를 한 번만 구하는 코드이므로,

여러번 rank 를 업데이트하는 코드는 아래 코드를 기반으로 직접 만드셔야 합니다.

 

 

 

 

mongo-spark lib 를 이용하여 mongoDB 와 Spark 를 연동시키는 방법에 대해선 이전 포스트를 참고.

mongoDB 의 database는 dataset, collection은 data 인 위치에 아래와 같은 data를 넣었다고 가정하자.

_id    from    to

1        1        2

2        1        3

3        1        4

4        2        1

5        3        1

6        4        1

그리고 $SPARK_HOME/conf/spark-default.conf 는 다음과 같다고 하자.

spark.mongodb.input.uri=mongodb://mymongodb/dataset.data

spark.mongodb.output.uri=mongodb://mymongodb/dataset.out

(참고로 여기서 mymongodb 는 mongo router 의 ip 주소값)

 

 

page rank 코드는 이전 포스트를 참고한다.

 

아래 코드를 통해, mongodb로부터 데이터를 읽고 page rank를 구현할 수 있다.

mongodb에 aggregate와 기타 query 쿼리를 넣어서 page rank 를 구현해보았다.

query는 넣지 않아도 되지만, mongodb가 할 수 있는 일을 만들어주기 위해 넣어보았다.

query 는 group by 를 넣었다. mongodb 가 groupby 를 하고 난 후의 결과를 spark 에게 넘겨준다.

 

import com.mongodb.spark._
import org.bson.Document
import java.util.ArrayList
import scala.collection.JavaConversions._

val rdd = MongoSpark.load(sc)

//from 을 group 짓고, 나머지 to 들을 하나의 array로 만드는 쿼리
val groupQry = "{$group:{_id:'$from', to:{$push:'$to'}}}"

//groupQry의 쿼리 결과는 Document 형식이고, 이를 map 할 때 id와 to으로 나누는 파싱 작업을 함
val links = rdd.withPipeline(Seq(Document.parse(groupQry))).map((doc : Document)=>{
     (doc.getInteger("_id"), doc.get("to"))
     })

//from 만 가져와서 초기 rank 값을 정해줌
val ranks = links.map(x=>{
     (x._1, 1.0)
     })

//join
val cvalues = links.join(ranks).values

val contribs = cvalues.flatMap{case(urls,rank)=>
     val url_array = urls.asInstanceOf[ArrayList[Int]]
     val size = url_array.size
     url_array.map(url => (url, rank/size))
     }

//rank를 새롭게 갱신
val new_ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85* _)

//확인
new_ranks.collect

//mongodb 에 바로 저장. 저장할 땐 org.bson.Document 형태여야 함
val save = new_ranks.map(x=> (new Document(x._1.toString, x._2.toString)))
MongoSpark.save(save)

 

반면 group by 쿼리가 없는 코드는 아래와 같다.

순수하게 mongodb 의 데이터를 그대로 읽어들어와서 spark APIs 를 통해 page rank 알고리즘을 돌린다.

import com.mongodb.spark._
import org.bson.Document

val rdd = MongoSpark.load(sc)

val lines = rdd.map( (doc : Document) => {
      (doc.getInteger("from"), doc.getInteger("to"))
      })

val links = lines.map{ s=>
      (s._1, s._2)
      }.distinct().groupByKey().cache()

var ranks = links.mapValues(v => 1.0)

val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
    val size = urls.size
    urls.map(url => (url, rank / size))
}
val new_ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)

val save = new_ranks.map(x=> (new Document(x._1.toString, x._2.toString)))

MongoSpark.save(save)

 

 

 

참고
https://dev4u.tistory.com/841
https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/

https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/
https://stackoverflow.com/questions/674713/converting-a-java-collection-into-a-scala-collection
https://stackoverflow.com/questions/49395627/mongospark-convert-bson-document-to-mapstring-double
https://docs.scala-lang.org/overviews/collections/conversions-between-java-and-scala-collections.html

https://stackoverflow.com/questions/48477651/writing-scala-spark-info-into-mongodb

+ Recent posts