mongoDB cluster 세팅 방법은 여기 참고.
구체적으로 쿼리 보내는 방법은 여기 참고
aggregation 공식 문서 적극 참고
aggregation query 명령어 공식 문서 적극 참고
mongo-spark connector 에서 mongo 로 aggregation 쿼리를 보내서 spark 로 받는 예제 코드들이다.
아래 코드 따라 aggregation 코드를 보내면 된다.
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark query test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) //여기서 rdd 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document]
val query = "{ $match : { _id : { $lt : 5 } } }"
val ltfive = rdd.withPipeline(Seq(Document.parse(query))) //여기서 ltfive 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] val df = ltfive.toDF() df.show() } }
|
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark query test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) val query = "{ $match : { _id : { $lt : 5 } } }"
val ltfive = rdd.withPipeline(Seq(Document.parse(query))).map( (doc : Document) =>{ (doc.getString("data")) }) //여기서 ltfive 는 org.apache.spark.rdd.RDD[String] val df = ltfive.toDF() df.show() } }
|
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark query test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) val query = "{ $match : { _id : { $lt : 5 } } }"
val ltfive = rdd.withPipeline(Seq(Document.parse(query))).map( (doc : Document) =>{ (doc.getInteger("_id"), doc.getString("data")) }) //여기서 ltfive 는 org.apache.spark.rdd.RDD[(Integer, String)] val df = ltfive.toDF() df.show() } }
|
여러 쿼리를 이어서 사용하고 싶을 때
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark query test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) val query = "{ $match : { _id : { $lt : 5 } } }" val query2 = "{ $project: { _id:0 } }"
val ltfive = rdd.withPipeline(Seq(Document.parse(query),Document.parse(query2))).map( (doc : Document) =>{ (doc.getString("data")) }) //여기서 ltfive 는 org.apache.spark.rdd.RDD[String] val df = ltfive.toDF() df.show() } }
|
word count 하고 싶을 때
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList
val rdd = MongoSpark.load(sc) val q1= "{$project: {words: {$split: ['$data', ' ']}}}" val q2= "{$unwind: '$words'}" val words = rdd.withPipeline(Seq(Document.parse(q1), Document.parse(q2))).map((doc:Document)=>{(doc.getString("words"))}).toDF() val count = words.groupBy("value").count()
|
'Spark' 카테고리의 다른 글
[Spark] 성능 튜닝 참고 링크들 (0) | 2019.11.21 |
---|---|
[Spark] Required executor memory (2048), overhead (384 MB), and PySpark memory (0 MB) is above the max threshold (2048 MB) of this cluster! 에러 원인 파악 및 해결 (0) | 2019.11.08 |
[Spark] Resource 제한하는 방법 (0) | 2019.11.05 |
[Spark] Master option 설명 (0) | 2019.11.02 |
[Spark] WebUI 의 duration 과 task time 이 왜 다른가? (0) | 2019.11.01 |