In previous post we learned about how to analyse time-temperature statistics and generate report with max/min temperature for each city(MultipleOutputs<Text, Text>). In this post we will write sample MapReduce program to understand how to remove duplicate records from a file. Download sample input file.
Mapper class:- In map method, we read input file line by line and make whole line as key of mapper output and NullWritable.get() as value which are written to context object.
Reducer class:- In reduce method each key unique_employee_record is written to context as key and NullWritable.get() as value.
Reducer class :-
Driver class :-
Execute driver class(hadoop services are running and input file location modified as per your inconvenience). It will create an output directory with file part-r-00000(no duplicate records).
To upload input file from local file system to HDFS
Lets modify this problem, how to remove duplicates row based on employee_id as key(Just keep first record wit the given employee_id).
In map method, make employee_id as key and whole row as value and write the same in context.
In reduce method, iterate over list for each input row and take first emp_record and write into context.
Mapper class:- In map method, we read input file line by line and make whole line as key of mapper output and NullWritable.get() as value which are written to context object.
con.write(row, NullWritable.get());After grouping and mapper output will appear something like
key | value list |
---|---|
<unique_employee_record_1> | <NW, NW, NW> |
<unique_employee_record_2> | <NW, NW> |
<unique_employee_record_3> | <NW, NW,NW ,NW,NW> |
<unique_employee_record_4> | <NW, NW,NW> |
Reducer class:- In reduce method each key unique_employee_record is written to context as key and NullWritable.get() as value.
con.write(<unique_employee_record_1>, NullWritable.get());
Sample code of mapper,reducer and driver class
Mapper classimport java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class RemoveDuplicateMapper extends Mapper<Object, Text, Text, NullWritable> { @Override public void map(Object key, Text row, Context con) { try { con.write(row, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RemoveDuplicateReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override public void reduce(Text key, Iterable<NullWritable> Value, Context con) { try { con.write(key, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RemoveDuplicateRecordsDriver { public static void main(String[] str) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf, "Duplicate removal"); job.setMapperClass(RemoveDuplicateMapper.class); job.setReducerClass(RemoveDuplicateReducer.class); job.setJarByClass(RemoveDuplicateRecordsDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job,new Path( "hdfs://localhost:54310/user/hduser1/employee_records_duplicates")); FileOutputFormat.setOutputPath(job,new Path( "hdfs://localhost:54310/user/hduser1/testfs/output_employee1")); System.exit(job.waitForCompletion(true) ? 1 : 0); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
To upload input file from local file system to HDFS
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -put /home/zytham/workspaceJuno/MapreduceSampleProject/employee_records_duplicates /user/hduser1/
Check the size of both input file and output file so that it can be verified duplicate record has been removed.Input file size is 13 K and output file size is 7.9 K.hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -ls -h /user/hduser1/employee_records_duplicates -rw-r--r-- 1 hduser1 supergroup 13.1 K 2015-12-12 22:50 /user/hduser1/employee_records_duplicates hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -ls -h /user/hduser1/testfs/output_employee1/part-r-00000 -rw-r--r-- 3 zytham supergroup 7.9 K 2015-12-12 22:56 /user/hduser1/testfs/output_employee1/part-r-00000
Lets modify this problem, how to remove duplicates row based on employee_id as key(Just keep first record wit the given employee_id).
In map method, make employee_id as key and whole row as value and write the same in context.
con.write(<emp_id>, row);After grouping mapper output looks like
key | value list |
---|---|
<unique_employee_id_1> | <emp_record, emp_record, emp_record> |
<unique_employee_id_2> | <emp_record, emp_record> |
<unique_employee_id_3> | <emp_record, emp_record,emp_record,emp_record> |
<unique_employee_id_4> | <emp_record, emp_record,emp_record> |
for (TextWritable val : values) { context.write(val, NullWritable.get()); break; }
good example.
ReplyDelete먹튀사이트 잡는 고릴라의 먹튀검증 통과 메이저토토사이트 안전놀이터 추천 훌륭한 먹튀검증을 통한 안전토토사이트를 추천합니다 고릴라만의 검증 시스템은 특별합니다 전세계적인 먹튀검증을 인전받은 최고의 메이저사이트 추천을 합니다 자세한 내용은 내 웹 사이트를 방문하십시오 먹튀검증.
ReplyDeleteMua vé tại đại lý vé máy bay Aivivu, tham khảo
ReplyDeletevé máy bay đi Mỹ tháng nào rẻ nhất
có thể bay từ mỹ về việt nam không
lịch bay từ moscow đến hà nội
vé máy bay từ nhật về việt nam 2021
dịch vụ taxi đi sân bay nội bài
Hii nice reading your blog
ReplyDelete