mongo-hadoop connector 라이브러리를 사용해서 map reduce 실행 시 data를 mongo 로부터 가져올 수 있다.


mongo-hadoop connector : https://github.com/mongodb/mongo-hadoop 


자세한 사항은 https://eyeballs.tistory.com/6 참고.







MongoDB 를 사용하는 MapReduce 에서 Mongo 로 쿼리를 보내는 방법을 알아보자.


몽고의 dataset.data 에서 데이터를 가져오고,


data.out 에 데이터를 저장한다.


dataset.data 에 저장된 데이터

{ "_id" : 1, "value" : "D6DtsdEtDF" }

{ "_id" : 2, "value" : "jsJ2432NEH" }

{ "_id" : 3, "value" : "E0nqwqILXR" }

{ "_id" : 4, "value" : "lwLq7LD9wx" }

{ "_id" : 5, "value" : "pMpHXyjEp8" }

{ "_id" : 6, "value" : "x5XpW6vHFO" }

{ "_id" : 7, "value" : "BoLP2JQHjP" }

{ "_id" : 8, "value" : "GsFvaHgU9y" }

{ "_id" : 9, "value" : "WADyvzq2B9" }

{ "_id" : 10, "value" : "5pRSfw6ITN" }

{ "_id" : 11, "value" : "jxuGftDrVX" }

{ "_id" : 12, "value" : "aa299zMERE" }

{ "_id" : 13, "value" : "aQaHF6oRWh" }

{ "_id" : 14, "value" : "OvUaEjvOMZ" }

{ "_id" : 15, "value" : "mptsnyLoUy" }

{ "_id" : 16, "value" : "JGGfq12C2J" }

{ "_id" : 17, "value" : "Rl5wpMW5xW" }

{ "_id" : 18, "value" : "FEHurGtmlO" }

{ "_id" : 19, "value" : "N9Jd1aZ5nx" }

{ "_id" : 20, "value" : "LHwboHV0KL" } 



아래 코드를 기반으로 쿼리 보내는 방법을 설명한다.


import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.bson.BasicBSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.util.MongoConfigUtil; public class Workload_IO { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf, "mongodb://[mongo-router ip address]/dataset.data"); MongoConfigUtil.setOutputURI(conf, "mongodb://[mongo-router ip address]/dataset.out");

// make 'query' object here MongoConfigUtil.setQuery(conf, query); Job job = Job.getInstance(conf, "Worklaod_IO"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Integer.class); job.setOutputValueClass(BSONWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); job.waitForCompletion(true); } public static class Map extends Mapper<Integer, BasicDBObject, IntWritable, Text> { public void map(Integer key, BasicDBObject value, Context context) throws IOException, InterruptedException { context.write(new IntWritable(key), new Text(value.getString("value"))); } } public static class Reduce extends Reducer<IntWritable, Text, Integer, BSONWritable> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { BasicBSONObject output = new BasicBSONObject(); BSONWritable reduceResult = new BSONWritable(); Text val = values.iterator().next(); if (val != null) { output.put("value", val.toString()); reduceResult.setDoc(output); context.write(key.get(), reduceResult); } } } }


MongoConfigUtil.setQuery 의 

첫번째 인자는 Configuration object

두번째 인자는 String 혹은 DBObject 가 들어간다.


위에 // make 'query' object here 라고 되어 있는 곳에 쿼리 오브젝트를 만든다.





1. String 객체를 만들어서 쿼리를 보내는 방법

String query = "{_id:3}";


String query = "{_id:{$lt:5}}";



2. BasicDBObject 를 만들어서 보내는 방법

DBObject query = new BasicDBObject("_id", 3);


DBObject query = new BasicDBObject("_id", new BasicDBObject("$lt", 3))
	.append("_id", new BasicDBObject("$gt", 18)); //이렇게 하면 앞에 3 은 무시되고 뒤에 것이 적용됨.


DBObject query = new BasicDBObject("$or",
	new BasicDBObject[] { new BasicDBObject("_id", new BasicDBObject("$lt", 2)),
	new BasicDBObject("_id", new BasicDBObject("$gte", 7)) });



3. QueryBuilder 를 만들어서 보내는 방법

DBObject query = QueryBuilder.start("_id").lessThan(5).get();



여러 방법이 있다.






*mongo-hadoop connector 에서 aggregator query 를 보내는 API는 제공하지 않는다.


(참고 : https://groups.google.com/forum/#!topic/mongodb-user/4Us8yyoz7F8)


대신, mongo java driver 를 이용하여 aggregate 를 보낸 후 나온 결과값을 map reduce 에 이용할 수 있다.


자세한 내용은 https://eyeballs.tistory.com/43 여기 잘 정리해두었으니 참고.


결과가 주르륵 나오는 aggregate query의 맨 마지막에 $out 을 사용하면


해당 쿼리를 곧바로 새로운 mongodb document 에 저장해준다.


그럼 그 document를 map-reduce 에서 불러서 사용하면


mongo 에 aggregate query를 날린 결과를 map reduce 에서 사용하는 효과가 난다.










Hadoop 에서 지원하는 org.apache.hadoop.fs.FileSystem을 사용하면 된다.


읽을 때


try { Path pt = new Path("hdfs:data/path/is/here");// Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String line = null; while ((line = br.readLine()) != null) {     //do something here } } catch (Exception e) { }



위 처럼 읽으면 된다.


Path 에 hdfs:data/path/is/here 라고 되어있는 부분에


hdfs 에서 원하는 데이터의 path 를 넣으면 된다.


예를 들어보자.




아래 코드는 MR 을 두 번 하는데,


첫번째 MR 에서 숫자를 하나 만들고(key 별 value 의 개수를 구함) 그 숫자를 HDFS 에 저장.


두번째 MR 에서 위에서 센 숫자를 HDFS 에서 읽어서 사용(데이터의 뒤에 붙임)한다.  


input_data

0 1

0 2

2 1

2 0

2 3

3 2

1 3


input path in HDFS

hadoop fs -put input_data /data


path : /data


(즉, hdfs:/data 에 저장됨)


아래 코드로 Workload_IO.jar 파일을 만듦. (만드는 방법은 여기서 설명하지 않음)


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.google.common.collect.Iterators;

public class Workload_IO {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = job1Setting(conf, args);
		job.waitForCompletion(true);

		try {
			Path pt = new Path("hdfs:" + args[0] + ".temp/part-r-00000");// Location of file in HDFS
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
			String line = null;
			while ((line = br.readLine()) != null) {
				conf.set("test", line);
			}
		} catch (Exception e) {
		}

		Job job2 = job2Setting(conf, args);
		job2.waitForCompletion(true);
	}

	public static class Map extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			context.write(new Text("1"), value);
		}
	}

	public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			int size = Iterators.size(values.iterator());
			context.write(NullWritable.get(), new Text(String.valueOf(size)));
		}
	}

	public static class Map2 extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			String temp = key.toString() + " " + context.getConfiguration().get("test");
			context.write(new Text(temp), value);
		}
	}

	public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			for (Text value : values) {
				String fromHDFS = context.getConfiguration().get("test");
				if (fromHDFS == null)
					fromHDFS = "nothing";
				String temp = value.toString() + " " + fromHDFS;
				context.write(key, new Text(temp));
			}
		}
	}

	private static Job job1Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".temp"));
		return job;
	}

	private static Job job2Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO2");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map2.class);
		job.setReducerClass(Reduce2.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".out"));
		return job;
	}
}




아래 명령어로 MR 을 돌림

hadoop jar Workload_IO.jar Workload_IO /data 



결과


0 7     2 7

0 7     1 7

1 7     3 7

2 7     3 7

2 7     0 7

2 7     1 7

3 7     2 7 


HDFS 상태

hadoop fs -ls /


-rw-r--r--   3 root supergroup         28 2019-02-15 03:54 /data

drwxr-xr-x   - root supergroup          0 2019-02-15 07:25 /data.out

drwxr-xr-x   - root supergroup          0 2019-02-15 07:24 /data.temp 



/data : 원본 데이터.

/data.temp : 첫번째 MR 이 쓴 데이터.

/data.out : 두번째 MR 이 쓴 데이터. 각 key value 에 7(/data.temp/part-r-00000 값) 이 붙어있다.





위의 코드는 사용 예제이므로 커스터마이징 해서 사용하면 된다.




참고.

https://stackoverflow.com/questions/26209773/hadoop-map-reduce-read-a-text-file/26210291







katacoda 라는 사이트가 있는데, 여기서 Docker, Git, Kubernat



https://www.katacoda.com/





위와 같은 기술들을 배울 수 있다고 함.


웹에서 직접 실습하면서!


대박 :)


즐겁게 공부합시다 ㅎㅎ







Docker container 를 만들어서 ssh 접속을 해야 할 일이 있었다.



sudo docker run -itd --network my-net --name my-ubuntu --ip 10.0.2.10 -v /home/eye/data:/root my_ubuntu /bin/bash



위와 같은 식으로 -v 옵션을 줘서 local ( /home/eye/data/ )의 파일들을 my-ubuntu container 내부의 root 위치에 공유하고 싶었다.


my_ubuntu 이미지는 ssh keygen 을 미리 해 둔 상태의 이미지.


근데 막상 만들어서 들어가서


cd ~/.ssh


로 들어가보니, known_hosts 라는 파일만 있고 있어야 할 authorized_keys  config  id_rsa  id_rsa.pub 파일들이 없네.


container를 몇 번 만들고 지우고 하다보니 -v 옵션에서 문제가 있다는 걸 발견.



sudo docker run -itd --network my-net --name my-ubuntu --ip 10.0.2.10 -v /home/eye/data:/root/data my_ubuntu /bin/bash



/root 로 바로 주면 안 되나봄. 이유는 모르겠으나 뒤에 data를 붙이자

known_hosts 가 사라지고 authorized_keys  config  id_rsa  id_rsa.pub 파일들이 생김.






간단한 예를 통해 page rank 알고리즘을 이해해보기로 한다.




위와 같은 A,B,C,D 라는 인터넷 페이지가 있고,


각각의 페이지마다 서로를 향한 링크를 갖고 있다고 하자.


예를 들어 A 사이트에서는 B와 C로 가는 링크가 있고


C 사이트에서는 A,B,D 로 가는 링크가 있다.



page rank 알고리즘에서 rank 값을 계산하는 함수를 pr 라고 하자.


초기 pr 값은 1/(전체 페이지 개수) 로 정한다.


즉 


pr(A) = 1/4

pr(B) = 1/4

pr(C) = 1/4

pr(D) = 1/4


가 된다.


page의 rank를 갱신하는 방법은 이렇다.




< A 페이지의 rank 값을 갱신 > 




pr(A) 를 갱신해보자.


pr(A) = (1-DF)/(페이지 전체 개수) + DF * ((A로 향하는 페이지의 현재 pr 값 / A로 향하는 페이지의 out link 개수) 들의 합)


pr(A) = ( 1-DF ) / 4 + DF * ( pr(C) / 3 )


pr(A) = ( 1-0.85 ) / 4 + 0.85 * ( (1/4) / 3)



페이지는 모두 4 이므로 페이지의 전체 개수로써 사용되는 분모는 4이다.


A로 향하는 페이지는 C 하나 뿐이다.


C의 현재 pr 값은 초기에 정해두었던 1/4 이므로 pr(C) = 1/4 이다.


C 에서 뻗어나가는 링크가 총 3개(A,B,D 로 뻗음)이므로 마지막 분모는 3이 된다.



DF 가 뭔지 궁금할 것 같은데


DF는 Damping Factor의 약자이며 설명은 아래와 같다.



damping factor란 "어떤 마구잡이로 웹서핑을 하는 사람이 그 페이지에 만족을 못하고 다른 페이지로 가는 링크를 클릭할 확률" 이다. 


즉, damping factor가 1이면, 무한히 링크를 클릭한다는 뜻이고, 0이면 처음 방문한 페이지에서 무조건 멈추고 더 이상 클릭하지 않는다는 뜻이다. 


0.85이면, 85%의 확률로 다른 페이지를 클릭해볼 것이라는 뜻이다. 이 경우 15%의 확률에 걸리는 순간 클릭을 멈추고 그 페이지를 살펴본다.



따라서 pr(A) 를 계산해보면 0.108333333가 된다.


pr(A) = 0.108333333


이 갱신 값은 다른 페이지들의 rank 값이 갱신될 때 한꺼번에 갱신된다.





< B 페이지의 rank 값을 갱신 > 



pr(B) 를 갱신해보자.


방법은 A를 갱신했을 때와 같다.


pr(B) = (1-DF)/(페이지 전체 개수) + DF * ((B로 향하는 페이지의 현재 pr 값 / B로 향하는 페이지의 out link 개수) 들의 합)


pr(B) = ( 1-DF ) / 4 + DF * ( pr(A) / 2 + pr(C) / 3 )


pr(B) = ( 1-0.85 ) / 4 + 0.85 * ( (1/4) / 2 + (1/4) / 3)



B로 향하는 링크를 갖는 페이지는 A와 C 두 개이다.


A의 pr(A) 값은 초기값인 1/4 이다(위에서 갱신값을 구하긴 했지만 실제 갱신은 한꺼번에 이루어지므로 초기값이 사용됨)


A에서 뻗어나가는 링크의 개수가 2개(B,C로) 이므로 (1/4)/2 가 된다.


C의 pr(C) 값은 초기값인 1/4이다.


C에서 뻗어나가는 링크의 개수가 3개(A,B,D로) 이므로 (1/4)/3 이 된다.


위에서 구한 (1/4)/2 와 (1/4)/3을 모두 더한 값을 식에 적용하면 된다.


계산하면 pr(B)의 rank 값은 0.214583333 가 된다.


pr(B) = 0.214583333






< C 페이지의 rank 값을 갱신 >  



pr(C) 를 갱신해보자.


방법은 위와 같다.


pr(C) = (1-DF)/(페이지 전체 개수) + DF * ((C로 향하는 페이지의 현재 pr 값 / C로 향하는 페이지의 out link 개수) 들의 합)


pr(C) = ( 1-DF ) / 4 + DF * ( pr(A) / 2 + pr(D) / 1 )


pr(C) = ( 1-0.85 ) / 4 + 0.85 * ( (1/4) / 2 + (1/4) / 1)



C로 향하는 링크를 갖는 페이지는 A와 D 두 개이다.


A의 pr(A) 값은 초기값인 1/4 이다.


A에서 뻗어나가는 링크의 개수가 2개(B,C로) 이므로 (1/4)/2 가 된다.


D의 pr(D) 값은 초기값인 1/4이다.


D에서 뻗어나가는 링크의 개수가 1개(C로) 이므로 (1/4)/1 이 된다.


위에서 구한 (1/4)/2 와 (1/4)/1을 모두 더한 값을 식에 적용하면 된다.


계산하면 pr(C)의 rank 값은 0.35625 가 된다.


pr(C) = 0.35625






< D 페이지의 rank 값을 갱신 >  



pr(D) 를 갱신해보자.


방법은 위와 같다.


pr(D) = (1-DF)/(페이지 전체 개수) + DF * ((D로 향하는 페이지의 현재 pr 값 / D로 향하는 페이지의 out link 개수) 들의 합)


pr(D) = ( 1-DF ) / 4 + DF * ( pr(B) / 1 + pr(C) / 3 )


pr(D) = ( 1-0.85 ) / 4 + 0.85 * ( (1/4) / 1 + (1/4) / 3)



D로 향하는 링크를 갖는 페이지는 B와 C 두 개이다.


B의 pr(B) 값은 초기값인 1/4 이다.


B에서 뻗어나가는 링크의 개수가 1개(D로) 이므로 (1/4)/1 가 된다.


C의 pr(C) 값은 초기값인 1/4이다.


C에서 뻗어나가는 링크의 개수가 3개(A,B,D로) 이므로 (1/4)/3 이 된다.


위에서 구한 (1/4)/1 과 (1/4)/3을 모두 더한 값을 식에 적용하면 된다.


계산하면 pr(D)의 rank 값은 0.320833333 가 된다.


pr(D) = 0.320833333








이렇게 한 번 pr 값을 갱신하였다.


위에서는 초기값인 계산할 때 1/4를 계속 사용하였는데,


두번째로 rank를 갱신할 때는,


같은 계산식 안에, 위에서 새롭게 만든 rank 값들을 대신 사용한다.


즉, A의 rank 값을 두 번째로 갱신 할 때는 아래와 같은 식이 된다는 말이다.


pr(A) = ( 1-0.85 ) / 4 + 0.85 * ( 0.35625 / 3)


(C의 초기값인 1/4 대신, 새롭게 갱신된 rank 값인 0.35625 가 들어갔다.)







pr(A) = 0.108333333

pr(B) = 0.214583333

pr(C) = 0.35625

pr(D) = 0.320833333


rank 값이 높을수록 중요한 페이지가 된다.


여기선 C - D - B - A 순으로 rank 값이 높으므로 이 순서로 중요한 페이지가 된다.


즉 C가 가장 중요하고 A가 가장 덜 중요한 페이지가 된다.









참고한 곳 : 


https://sungmooncho.com/2012/08/26/pagerank/


https://www.youtube.com/watch?v=P8Kt6Abq_rM


http://pi.math.cornell.edu/~mec/Winter2009/RalucaRemus/Lecture3/lecture3.html



matrix 연산을 통해 페이지랭크 구현 참고 링크들

https://www.slideshare.net/ChenGengMa/a-hadoop-implementation-of-pagerank

https://www.cs.utah.edu/~jeffp/teaching/cs5140-S15/cs5140/L24-MR+PR.pdf

https://github.com/zonagit/MapReduceAndHadoop

https://www.youtube.com/watch?v=3_1h13PJkUs

http://www.dcs.bbk.ac.uk/~dell/teaching/cc/book/mmds/mmds_ch5_2.pdf

https://stanford.edu/~rezab/amdm/notes/lecture4.pdf

https://github.com/guillaume6pl/mr_pagerank

+ Recent posts