hadoop 详解如何实现数据排序
作者:YinJuan791739156
在很多业务场景下,需要对原始的数据读取分析后,将输出的结果按照指定的业务字段进行排序输出,方便上层应用对结果数据进行展示或使用,减少二次排序的成本
排序是Hadoop的默认行为,不管你是否需要,MapReduce的MapTask和Task都会对输出的结果的Key进行排序,默认的排序顺序是按照字典顺序排列,实现的方法是快速排序。
自定义排序需要继承WritableComparable,实现compareTo方法就完成了自定义排序。
下面介绍几种排序的场景
一、全排序
全排序是指最终只产生一个输出文件,数据在文件内部有序。
1、输入数据
13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 5548
2、Bean对象
继承WritabelComparable,并实现方法compareTo。WritabelComparable起始就是两个接口的综合,Writable是Hadoop自定义序列化数据需要实现的接口,而Coparable是比较排序需要实现的接口。
package cn.nuwa.hap.cp; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Data @NoArgsConstructor public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //4 实现序列化和反序列化方法,注意顺序一定要保持一致 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } //5 重写ToString @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { //按照总流量比较,倒序排列 if(this.sumFlow > o.getSumFlow()){ return -1; }else if(this.sumFlow < o.getSumFlow()){ return 1; }else { return 0; } } }
3、Mapper类
package cn.nuwa.hap.cp; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> { private FlowBean outK = new FlowBean(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行数据 String line = value.toString(); //2 按照"\t",切割数据 String[] split = line.split("\t"); //3 封装outK outV outK.setUpFlow(Long.parseLong(split[1])); outK.setDownFlow(Long.parseLong(split[2])); outK.setSumFlow(); outV.set(split[0]); //4 写出outK outV context.write(outK,outV); } }
4、Reduce类
package cn.nuwa.hap.cp; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //遍历values集合,循环写出,避免总流量相同的情况 for (Text value : values) { //调换KV位置,反向写出 context.write(value, key); } } }
5、Dirver类
package cn.nuwa.hap.cp; import cn.nuwa.hap.wb.FlowBean; import cn.nuwa.hap.wb.ProvincePartitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 关联本Driver类 job.setJarByClass(FlowDriver.class); //3 关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4 设置Map端输出KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(cn.nuwa.hap.wb.FlowBean.class); //5 设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6 设置程序的输入输出路径 FileInputFormat.setInputPaths(job, new Path("C:\\Users\\Dell\\Desktop\\hadoop\\inputFlow")); FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\Dell\\Desktop\\hadoop\\outputFlow")); //8 指定自定义分区器 job.setPartitionerClass(ProvincePartitioner.class); //9 同时指定相应数量的ReduceTask job.setNumReduceTasks(5); //7 提交Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
6、最终结果
13509468723 7335 110349 117684 13975057813 11058 48243 59301 13568436656 3597 25635 29232 13736230513 2481 24681 27162 18390173782 9531 2412 11943 13630577991 6960 690 7650 15043685818 3659 3538 7197 13992314666 3008 3720 6728 15910133277 3156 2936 6092 13560439638 918 4938 5856 84188413 4116 1432 5548 13682846555 1938 2910 4848 18271575951 1527 2106 3633 15959002129 1938 180 2118 13590439668 1116 954 2070 13956435636 132 1512 1644 13470253144 180 180 360 13846544121 264 0 264 13729199489 240 0 240 13768778790 120 120 240 13966251146 240 0 240
二、分区排序
分区排序的本质是:
- 数据分区
- 区内数据有序
只需要在全排序的基础上加上分区的代码即可
1、分区类
package cn.nuwa.hap.cp; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner< FlowBean, Text> { @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { //获取手机号前三位 String phone = text.toString(); String prePhone = phone.substring(0, 3); //定义一个分区号变量partition,根据prePhone设置分区号 int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 3; }else { partition = 4; } //最后返回分区号partition return partition; } }
2、Driver类新增分区配置
// 设置自定义分区器 job.setPartitionerClass(ProvincePartitioner.class); // 设置对应的ReduceTask的个数 job.setNumReduceTasks(5);
3、结果
part-r-00000
13630577991 6960 690 7650 13682846555 1938 2910 4848
part-r-00001
13736230513 2481 24681 27162 13729199489 240 0 240 13768778790 120 120 240
part-r-00002
13846544121 264 0 264
part-r-00003
13975057813 11058 48243 59301 13992314666 3008 3720 6728 13956435636 132 1512 1644 13966251146 240 0 240
part-r-00004
13509468723 7335 110349 117684 13568436656 3597 25635 29232 18390173782 9531 2412 11943 15043685818 3659 3538 7197 15910133277 3156 2936 6092 13560439638 918 4938 5856 84188413 4116 1432 5548 18271575951 1527 2106 3633 15959002129 1938 180 2118 13590439668 1116 954 2070 13470253144 180 180 360
到此这篇关于hadoop 详解如何实现数据排序的文章就介绍到这了,更多相关hadoop 数据排序内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!