mongoDB cluster 세팅 방법은 여기 참고.

 

구체적으로 쿼리 보내는 방법은 여기 참고

 

aggregation 공식 문서 적극 참고

 

aggregation query 명령어 공식 문서 적극 참고

 

mongo-spark connector 에서 mongo 로 aggregation 쿼리를 보내서 spark 로 받는 예제 코드들이다.

아래 코드 따라 aggregation 코드를 보내면 된다.

 

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object IO_Mongo {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("mongo-spark query test")

      val sc = new SparkContext(conf)

      val rdd = MongoSpark.load(sc)

      //여기서 rdd 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] 

 

      val query = "{ $match : { _id : { $lt : 5 } } }"

 

      val ltfive = rdd.withPipeline(Seq(Document.parse(query)))

      //여기서 ltfive 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] 

      val df = ltfive.toDF()

      df.show()

  }

}

 

 

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object IO_Mongo {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("mongo-spark query test")

      val sc = new SparkContext(conf)

      val rdd = MongoSpark.load(sc)

      val query = "{ $match : { _id : { $lt : 5 } } }"

 

      val ltfive = rdd.withPipeline(Seq(Document.parse(query))).map(

        (doc : Document) =>{

           (doc.getString("data"))

        })

      //여기서 ltfive 는 org.apache.spark.rdd.RDD[String]

      val df = ltfive.toDF()

      df.show()

  }

}

 

 

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object IO_Mongo {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("mongo-spark query test")

      val sc = new SparkContext(conf)

      val rdd = MongoSpark.load(sc)

      val query = "{ $match : { _id : { $lt : 5 } } }"

 

      val ltfive = rdd.withPipeline(Seq(Document.parse(query))).map(

        (doc : Document) =>{

          (doc.getInteger("_id"), doc.getString("data"))

        })

      //여기서 ltfive 는 org.apache.spark.rdd.RDD[(Integer, String)]

      val df = ltfive.toDF()

      df.show()

  }

}

 

 

여러 쿼리를 이어서 사용하고 싶을 때

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object IO_Mongo {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("mongo-spark query test")

      val sc = new SparkContext(conf)

      val rdd = MongoSpark.load(sc)

      val query = "{ $match : { _id : { $lt : 5 } } }"

      val query2 = "{ $project: { _id:0 } }"

 

      val ltfive = rdd.withPipeline(Seq(Document.parse(query),Document.parse(query2))).map(

        (doc : Document) =>{

          (doc.getString("data"))

        })

      //여기서 ltfive 는 org.apache.spark.rdd.RDD[String]

      val df = ltfive.toDF()

      df.show()

  }

}

 

 

word count 하고 싶을 때

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

 

val rdd = MongoSpark.load(sc)

val q1= "{$project: {words: {$split: ['$data', ' ']}}}"

val q2= "{$unwind: '$words'}"

val words = rdd.withPipeline(Seq(Document.parse(q1), Document.parse(q2))).map((doc:Document)=>{(doc.getString("words"))}).toDF()

val count = words.groupBy("value").count()

 

 

 

+ Recent posts