MongoDB 에서 읽은 데이터로 Hadoop Map-Reduce 를 돌리고, 결과를 다시 MongoDB 에 쓰는 것을 해보자.


MongoDB와 Hadoop 을 연결시켜주는 " mongo-hadoop connector " 를 사용한다.

https://github.com/mongodb/mongo-hadoop


아래 page 를 참고해서

core jar 파일과 mongo java driver 를 받아서, hadoop 의 class path 내에 넣는다.

https://github.com/mongodb/mongo-hadoop/wiki/MapReduce-Usage#installation


mongo-java-driver 와 mongo-hadoop-core 는 아래 페이지에서 받는다.

https://mvnrepository.com/artifact/org.mongodb




그러면 hadoop Map-Reduce 에서 해당 lib를 사용하여 MongoDB 에 접근이 가능하다.







아래 코드에서 사용할 데이터는 아래와 같다.


HDFS 일 때

asdgegwegwegw

wefewgrgevefrre

wegwfkwenfdmi

dfdsggwehwefee

..... 


MongoDB 에 저장되어 있을 때

 _id    data

1        asdgegwegwegw

2       wefewgrgevefrre

3       wegwfkwenfdmi

4       dfdsggwehwefee

..... 





먼저  아주 간단한 Hadoop Map-Reduce Code 를 보자.

이 코드는 읽은 데이터를 그대로 출력하는 코드이다.



 


import
java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Workload_IO { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Worklaod_IO"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class Map extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(key.toString()), value); } } public static class Reduce extends Reducer<Text, Text, NullWritable, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text val = values.iterator().next(); if (val != null) { context.write(NullWritable.get(), val); } } } }


Map 쪽의 key 값은 Hadoop 이 알아서 만든 값이기 때문에 Reduce 로 모으는 용도로만 쓰고, output 에서는 쓰지 않는다.

output 의 key 값이 NullWritable 이다. 이 말인 즉슨, key 값에는 아무것도 쓰지 않는다는 말이다.

만약 NullWritable 이 아니라 String key = ""; 같은 것을 넣는다면 맨 앞에 \t 이 들어가게 된다.

아무것도 쓰고 싶지 않고 싶을 때 NullWritable.get() 을 사용하면 된다.



HDFS 에서 읽고 쓰는 경우


< Map Input > 

  key 타입 : Object

  value 타입 : Text


<Map Output>

  key 타입 : Text

  value 타입 : Text


<Reduce Input>

  key 타입 : Text

  value 타입 : Text


<Reduce Output>

  key 타입 : NullWritable

  value 타입 : Text



똑같은 일을 하지만 대신 HDFS 에서 데이터를 읽고 쓰는 게 아니라 MongoDB 에서 읽고 MongoDB에 쓰는 코드는 아래와 같다.


 


import
java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.bson.BasicBSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.util.MongoConfigUtil; public class Workload_IO { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf, "mongodb://" + args[0]); MongoConfigUtil.setOutputURI(conf, "mongodb://" + args[1]); Job job = Job.getInstance(conf, "Worklaod_IO"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Integer.class); job.setOutputValueClass(BSONWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); job.waitForCompletion(true); } public static class Map extends Mapper<Integer, BasicDBObject, IntWritable, Text> { public void map(Integer key, BasicDBObject value, Context context) throws IOException, InterruptedException { context.write(new IntWritable(key), new Text(value.getString("data"))); } } public static class Reduce extends Reducer<IntWritable, Text, Integer, BSONWritable> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { BasicBSONObject output = new BasicBSONObject(); BSONWritable reduceResult = new BSONWritable(); Text val = values.iterator().next(); if (val != null) { output.put("value", val.toString()); reduceResult.setDoc(output); context.write(key.get(), reduceResult); } } } }


다른 점만 bold 처리 해보았다.


하나 하나 살펴보자.

 



1.

 

MongoConfigUtil.setInputURI(conf, "mongodb://" + args[0]); MongoConfigUtil.setOutputURI(conf, "mongodb://" + args[1]);



args[0]에는 MongoDB 로부터 데이터를 읽어들일 input path

args[1] MongoDB 에 데이터를 쓸 output path 가 들어간다.


둘 다 표시 형식은  [mongodb의 router ip address]/[mongodb database name].[mongodb collection name] 이 들어간다.

예를 들어


MongoConfigUtil.setInputURI(conf, "mongodb://192.168.0.3/dataset.data");

MongoConfigUtil.setOutputURI(conf, "mongodb://192.168.0.3/dataset.out"); 



mongo router 는 192.168.0.3 의 ip 주소를 갖고 있다.

dataset 이라는 데이터베이스 내에 data 라는 collection 가 있고 여기서 데이터들(documents)를 읽어온다.

dataset 이라는 데이터베이스 내에 out 이라는 collection 에 결과값들을 쓴다.


_id 값이 Map 의 key 값으로 들어가고 그 외의 값들이 Map 의 value 값으로 들어간다.






2.

job.setOutputValueClass(BSONWritable.class); 

 


Reduce 의 output value 형식은 BSONWritable.class 여야 한다.

MongoDB 에서 BSON 형식으로 데이터가 이동하기 때문이다.







3.

job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); 

 


IO 의 포맷은 MongoInputFormat.class, MongoOutputFormat.class 여야 한다.

그래야 MongoDB 에서 읽고 쓸 수 있다.







4.

context.write(new IntWritable(key), new Text(value.getString("data")));   


Map 에서의 key 값은 MongoDB 의 _id 값이 된다.

BasicDBObject 로 날아온 MongoDB 의 데이터에서

getString, getInteger .. 등의 메소드를 통해 값을 읽어올 수 있다.

가령

{ _id : 2,

  name : "eyeballs",

  age : 20 }

인 데이터를 저장하고 있는 mongoDB 에서 데이터를 읽었을 때

key 는 2 가 된다.

value.getString("name") 의 return 값은 "eyeballs" 가 된다.

value.getInteger("age") 의 return 값은 20 이 된다.






5.

 

BasicBSONObject output = new BasicBSONObject(); BSONWritable reduceResult = new BSONWritable(); Text val = values.iterator().next(); if (val != null) { output.put("value", val.toString()); reduceResult.setDoc(output); context.write(key.get(), reduceResult); }


BSONWritable 형식으로 context 에 write 하는 방법이다.

output.put 의 첫번째 인자로, MongoDB 에 저장할 데이터 값의 "key" 가 들어가고,

두번째 인자로 저장할 데이터가 들어간다.








MongoDB 에서 읽고 쓰는 경우


< Map Input > 

  key 타입 : Integer (_id 가 1,2,3... 값이었기 때문에)

  value 타입 : BasicDBObject (MongoDB 에서 읽어올 때 이런 형식으로 )


<Map Output>

  key 타입 : IntWritable

  value 타입 : Text


<Reduce Input>

  key 타입 : IntWritable

  value 타입 : Text


<Reduce Output>

  key 타입 : Integer

  value 타입 : BSONWritable



쓴 후에 해당 데이터베이스 내의 컬렉션으로 가서 저장이 잘 되었는지 확인하자.



좀 더 많은 비교 코드를 보고 싶다면 아래 git 을 참고하자.

https://github.com/eyeballss/mongo-hadoop












+ Recent posts