Hadoop 에서 지원하는 org.apache.hadoop.fs.FileSystem을 사용하면 된다.
읽을 때
try { Path pt = new Path("hdfs:data/path/is/here");// Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String line = null; while ((line = br.readLine()) != null) { //do something here } } catch (Exception e) { }
위 처럼 읽으면 된다.
Path 에 hdfs:data/path/is/here 라고 되어있는 부분에
hdfs 에서 원하는 데이터의 path 를 넣으면 된다.
예를 들어보자.
아래 코드는 MR 을 두 번 하는데,
첫번째 MR 에서 숫자를 하나 만들고(key 별 value 의 개수를 구함) 그 숫자를 HDFS 에 저장.
두번째 MR 에서 위에서 센 숫자를 HDFS 에서 읽어서 사용(데이터의 뒤에 붙임)한다.
input_data
0 1 0 2 2 1 2 0 2 3 3 2
1 3 |
input path in HDFS
hadoop fs -put input_data /data path : /data (즉, hdfs:/data 에 저장됨) |
아래 코드로 Workload_IO.jar 파일을 만듦. (만드는 방법은 여기서 설명하지 않음)
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.google.common.collect.Iterators; public class Workload_IO { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = job1Setting(conf, args); job.waitForCompletion(true); try { Path pt = new Path("hdfs:" + args[0] + ".temp/part-r-00000");// Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String line = null; while ((line = br.readLine()) != null) { conf.set("test", line); } } catch (Exception e) { } Job job2 = job2Setting(conf, args); job2.waitForCompletion(true); } public static class Map extends Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text("1"), value); } } public static class Reduce extends Reducer<Text, Text, NullWritable, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int size = Iterators.size(values.iterator()); context.write(NullWritable.get(), new Text(String.valueOf(size))); } } public static class Map2 extends Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String temp = key.toString() + " " + context.getConfiguration().get("test"); context.write(new Text(temp), value); } } public static class Reduce2 extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String fromHDFS = context.getConfiguration().get("test"); if (fromHDFS == null) fromHDFS = "nothing"; String temp = value.toString() + " " + fromHDFS; context.write(key, new Text(temp)); } } } private static Job job1Setting(Configuration conf, String[] args) throws IOException { Job job = Job.getInstance(conf, "Worklaod_IO"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[0] + ".temp")); return job; } private static Job job2Setting(Configuration conf, String[] args) throws IOException { Job job = Job.getInstance(conf, "Worklaod_IO2"); job.setJarByClass(Workload_IO.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map2.class); job.setReducerClass(Reduce2.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[0] + ".out")); return job; } }
아래 명령어로 MR 을 돌림
hadoop jar Workload_IO.jar Workload_IO /data |
결과
0 7 2 7 0 7 1 7 1 7 3 7 2 7 3 7 2 7 0 7 2 7 1 7 3 7 2 7 |
HDFS 상태
hadoop fs -ls / -rw-r--r-- 3 root supergroup 28 2019-02-15 03:54 /data drwxr-xr-x - root supergroup 0 2019-02-15 07:25 /data.out drwxr-xr-x - root supergroup 0 2019-02-15 07:24 /data.temp |
/data : 원본 데이터.
/data.temp : 첫번째 MR 이 쓴 데이터.
/data.out : 두번째 MR 이 쓴 데이터. 각 key value 에 7(/data.temp/part-r-00000 값) 이 붙어있다.
위의 코드는 사용 예제이므로 커스터마이징 해서 사용하면 된다.
참고.
https://stackoverflow.com/questions/26209773/hadoop-map-reduce-read-a-text-file/26210291
'Hadoop' 카테고리의 다른 글
[Hadoop] 하둡과 MongoDB 연동하는 코드 (0) | 2019.03.21 |
---|---|
[Hadoop] mongo-hadoop connector 에서 MongoDB 에 쿼리 보내기 (0) | 2019.02.15 |
[Hadoop] 공식 map-reduce examples (0) | 2019.02.11 |
[Hadoop] Ubuntu 에서 Hadoop 설치하고 실행하는 튜토리얼 사이트 (0) | 2019.01.28 |
[Hadoop] 19888 포트를 사용하는 job history 서버 실행 방법 (0) | 2019.01.18 |