mongo-hadoop connector 라이브러리를 사용해서 map reduce 실행 시 data를 mongo 로부터 가져올 수 있다.


mongo-hadoop connector : https://github.com/mongodb/mongo-hadoop 


자세한 사항은 https://eyeballs.tistory.com/6 참고.







MongoDB 를 사용하는 MapReduce 에서 Mongo 로 쿼리를 보내는 방법을 알아보자.


몽고의 dataset.data 에서 데이터를 가져오고,


data.out 에 데이터를 저장한다.


dataset.data 에 저장된 데이터

{ "_id" : 1, "value" : "D6DtsdEtDF" }

{ "_id" : 2, "value" : "jsJ2432NEH" }

{ "_id" : 3, "value" : "E0nqwqILXR" }

{ "_id" : 4, "value" : "lwLq7LD9wx" }

{ "_id" : 5, "value" : "pMpHXyjEp8" }

{ "_id" : 6, "value" : "x5XpW6vHFO" }

{ "_id" : 7, "value" : "BoLP2JQHjP" }

{ "_id" : 8, "value" : "GsFvaHgU9y" }

{ "_id" : 9, "value" : "WADyvzq2B9" }

{ "_id" : 10, "value" : "5pRSfw6ITN" }

{ "_id" : 11, "value" : "jxuGftDrVX" }

{ "_id" : 12, "value" : "aa299zMERE" }

{ "_id" : 13, "value" : "aQaHF6oRWh" }

{ "_id" : 14, "value" : "OvUaEjvOMZ" }

{ "_id" : 15, "value" : "mptsnyLoUy" }

{ "_id" : 16, "value" : "JGGfq12C2J" }

{ "_id" : 17, "value" : "Rl5wpMW5xW" }

{ "_id" : 18, "value" : "FEHurGtmlO" }

{ "_id" : 19, "value" : "N9Jd1aZ5nx" }

{ "_id" : 20, "value" : "LHwboHV0KL" } 



아래 코드를 기반으로 쿼리 보내는 방법을 설명한다.


import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.bson.BasicBSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.util.MongoConfigUtil; public class Workload_IO { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf, "mongodb://[mongo-router ip address]/dataset.data"); MongoConfigUtil.setOutputURI(conf, "mongodb://[mongo-router ip address]/dataset.out");

// make 'query' object here MongoConfigUtil.setQuery(conf, query); Job job = Job.getInstance(conf, "Worklaod_IO"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Integer.class); job.setOutputValueClass(BSONWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); job.waitForCompletion(true); } public static class Map extends Mapper<Integer, BasicDBObject, IntWritable, Text> { public void map(Integer key, BasicDBObject value, Context context) throws IOException, InterruptedException { context.write(new IntWritable(key), new Text(value.getString("value"))); } } public static class Reduce extends Reducer<IntWritable, Text, Integer, BSONWritable> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { BasicBSONObject output = new BasicBSONObject(); BSONWritable reduceResult = new BSONWritable(); Text val = values.iterator().next(); if (val != null) { output.put("value", val.toString()); reduceResult.setDoc(output); context.write(key.get(), reduceResult); } } } }


MongoConfigUtil.setQuery 의 

첫번째 인자는 Configuration object

두번째 인자는 String 혹은 DBObject 가 들어간다.


위에 // make 'query' object here 라고 되어 있는 곳에 쿼리 오브젝트를 만든다.





1. String 객체를 만들어서 쿼리를 보내는 방법

String query = "{_id:3}";


String query = "{_id:{$lt:5}}";



2. BasicDBObject 를 만들어서 보내는 방법

DBObject query = new BasicDBObject("_id", 3);


DBObject query = new BasicDBObject("_id", new BasicDBObject("$lt", 3))
	.append("_id", new BasicDBObject("$gt", 18)); //이렇게 하면 앞에 3 은 무시되고 뒤에 것이 적용됨.


DBObject query = new BasicDBObject("$or",
	new BasicDBObject[] { new BasicDBObject("_id", new BasicDBObject("$lt", 2)),
	new BasicDBObject("_id", new BasicDBObject("$gte", 7)) });



3. QueryBuilder 를 만들어서 보내는 방법

DBObject query = QueryBuilder.start("_id").lessThan(5).get();



여러 방법이 있다.






*mongo-hadoop connector 에서 aggregator query 를 보내는 API는 제공하지 않는다.


(참고 : https://groups.google.com/forum/#!topic/mongodb-user/4Us8yyoz7F8)


대신, mongo java driver 를 이용하여 aggregate 를 보낸 후 나온 결과값을 map reduce 에 이용할 수 있다.


자세한 내용은 https://eyeballs.tistory.com/43 여기 잘 정리해두었으니 참고.


결과가 주르륵 나오는 aggregate query의 맨 마지막에 $out 을 사용하면


해당 쿼리를 곧바로 새로운 mongodb document 에 저장해준다.


그럼 그 document를 map-reduce 에서 불러서 사용하면


mongo 에 aggregate query를 날린 결과를 map reduce 에서 사용하는 효과가 난다.






+ Recent posts