여기서 설명하는 코드는 여러 번 ranking 을 갱신하지 않습니다.
연구를 위해 rank 를 한 번만 구하는 코드이므로,
여러번 rank 를 업데이트하는 코드는 아래 코드를 기반으로 직접 만드셔야 합니다.
mongo-spark lib 를 이용하여 mongoDB 와 Spark 를 연동시키는 방법에 대해선 이전 포스트를 참고.
mongoDB 의 database는 dataset, collection은 data 인 위치에 아래와 같은 data를 넣었다고 가정하자.
_id from to 1 1 2 2 1 3 3 1 4 4 2 1 5 3 1 6 4 1 |
그리고 $SPARK_HOME/conf/spark-default.conf 는 다음과 같다고 하자.
spark.mongodb.input.uri=mongodb://mymongodb/dataset.data spark.mongodb.output.uri=mongodb://mymongodb/dataset.out |
(참고로 여기서 mymongodb 는 mongo router 의 ip 주소값)
page rank 코드는 이전 포스트를 참고한다.
아래 코드를 통해, mongodb로부터 데이터를 읽고 page rank를 구현할 수 있다.
mongodb에 aggregate와 기타 query 쿼리를 넣어서 page rank 를 구현해보았다.
query는 넣지 않아도 되지만, mongodb가 할 수 있는 일을 만들어주기 위해 넣어보았다.
query 는 group by 를 넣었다. mongodb 가 groupby 를 하고 난 후의 결과를 spark 에게 넘겨준다.
|
반면 group by 쿼리가 없는 코드는 아래와 같다.
순수하게 mongodb 의 데이터를 그대로 읽어들어와서 spark APIs 를 통해 page rank 알고리즘을 돌린다.
import com.mongodb.spark._
import org.bson.Document
val rdd = MongoSpark.load(sc)
val lines = rdd.map( (doc : Document) => {
(doc.getInteger("from"), doc.getInteger("to"))
})
val links = lines.map{ s=>
(s._1, s._2)
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
val new_ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
val save = new_ranks.map(x=> (new Document(x._1.toString, x._2.toString)))
MongoSpark.save(save)
참고
https://dev4u.tistory.com/841
https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/
https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/
https://stackoverflow.com/questions/674713/converting-a-java-collection-into-a-scala-collection
https://stackoverflow.com/questions/49395627/mongospark-convert-bson-document-to-mapstring-double
https://docs.scala-lang.org/overviews/collections/conversions-between-java-and-scala-collections.html
https://stackoverflow.com/questions/48477651/writing-scala-spark-info-into-mongodb
'Spark' 카테고리의 다른 글
[Spark] value toDF is not a member of org.apache.spark.rdd.RDD 에러 (0) | 2019.07.19 |
---|---|
[Spark] MongoDB 와 연동시 Aggregate Query하는 방법 (0) | 2019.07.13 |
[Scala] 공부하기 좋은 블로그 링크 (0) | 2019.06.17 |
[Spark] Spark-SQL 튜토리얼/가이드 링크들(영어) (0) | 2019.06.17 |
[Spark] scala dataframe 이용한 pagerank 알고리즘 구현하기 (0) | 2019.06.14 |