아래 사이트를 적극 참고한다.
https://docs.mongodb.com/spark-connector/master/scala-api/
1. 일단 spark-shell 이 구동될 수 있는 환경을 조성한다(환경을 어떻게 조성하는지는 여기 글에 쓰지 않음).
2. mongo db에 접속할 수 있도록 url 를 만들어둔다(url 을 어떻게 만드는지 여기 글에 쓰지 않음).
3. spark-shell 을 아래 명령어를 통해 접속한다.
./bin/spark-shell --master yarn \ --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection" \ --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 |
위의 127.0.0.1 은 mongo db의 url 로 대체하고,
test.myCollection 은 mongodb 내의 사용하길 원하는 데이터가 있는 [database 이름].[collection 이름] 으로 대체한다.
--conf 를 이용하여 여러가지 옵션을 줄 수 있다.
옵션 처리는 아래 링크 참고
https://docs.mongodb.com/spark-connector/master/configuration/
sbt 를 이용하여 연동할 때 (spark-submit)
< build.sbt >
version := "1.0" scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "io_mongo.jar" }
|
위의 dependency 를 넣어주면 된다.
4. 인터넷이 연결되어 있다는 가정 하에, mongo-spark-connector 가 알아서 받아진다.
5. 아래 명령어로 com.mongodb.spark._ 를 import 한다.
import com.mongodb.spark._ |
6. 아래 명령어로 count가 잘 되는지 확인한다. 잘 된다면 mongodb랑 잘 연동되었다는 의미.
val rdd = MongoSpark.load(sc)
|
7. sbt (spark-submit) 을 이용한 코드 예제는 아래와 같다.
실제 Spark 코드상에서 아래처럼 mongoDB 의 데이터를 가져올 수 있다.
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark integration test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) //여기서 rdd 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] val df = rdd.toDF() df.show() } }
|
mongodb에서 읽고, mongodb에 쓰는 방법은 가장 위의 링크를 참고한다.
spark-mongo 에서 읽기, 쓰기 mongo uri 를 정하는 법
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name] spark.mongodb.output.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name] |
spark-mongo 에서 캐시를 얼마나 오랫동안 유지할 지 정하는 법
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.keep_alive_ms 0 0을 넣으면 0ms 동안 캐시를 유지한다. 기본은 5000ms. |
spark-mongo 에서 readPreference, partitioner 바꾸는 법
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.readPreference.name nearest spark.mongodb.input.partitioner MongoShardedPartitioner |
local threshold 바꾸는 법
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.localThreshold 15 기본은 15ms. |
참고할 만한 곳
https://docs.databricks.com/_static/notebooks/mongodb.html
https://www.slideshare.net/mongodb/how-to-connect-spark-to-your-own-datasource
'Spark' 카테고리의 다른 글
[Spark] scala rdd 이용한 pagerank 알고리즘 이해하기 (0) | 2019.06.14 |
---|---|
[Spark] RDD 에 대한 구체적인 설명 링크 (0) | 2019.05.23 |
[Spark] Ubuntu에 sbt 설치하는 법 (0) | 2019.05.21 |
[Spark] There are 0 datanode(s) running and no node(s) are excluded in this operation. 에러 (0) | 2019.05.20 |
[Spark] mongo lib 다운받는 곳 (0) | 2019.05.20 |