新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

MapReduce如何实现自定义排序-创新互联

MapReduce概念

成都创新互联公司专注于桐梓企业网站建设,响应式网站建设,商城网站开发。桐梓网站建设公司,为桐梓等地区提供建站服务。全流程按需定制设计,专业设计,全程项目跟踪,成都创新互联公司专业和态度为您提供的服务

是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

MapReduce提供了以下的主要功能:

1)数据划分和计算任务调度:

系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并 负责Map节点执行的同步控制。

2)数据/代码互定位:

为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。

3)系统优化:

为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个 Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

4)出错检测和恢复:

以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。

测试文本:

tom 20 8000
nancy 22 8000
ketty 22 9000
stone 19 10000
green 19 11000
white 39 29000
socrates 30 40000

   MapReduce中,根据key进行分区、排序、分组
MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序
   为什么要自定义排序规则?现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序
以Text类型为例:
MapReduce如何实现自定义排序
MapReduce如何实现自定义排序
MapReduce如何实现自定义排序
MapReduce如何实现自定义排序
Text类实现了WritableComparable接口,并且有write()readFields()compare()方法
readFields()方法:用来反序列化操作
write()方法:用来序列化操作
所以要想自定义类型用来排序需要有以上的方法
自定义类代码

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Person implements WritableComparable {
   private String name;
   private int age;
   private int salary;
   public Person() {
   }
   public Person(String name, int age, int salary) {
     //super();
     this.name = name;
     this.age = age;
     this.salary = salary;
   }
   public String getName() {
     return name;
   }
   public void setName(String name) {
     this.name = name;
   }
   public int getAge() {
     return age;
   }
   public void setAge(int age) {
     this.age = age;
   }
   public int getSalary() {
     return salary;
   }
   public void setSalary(int salary) {
     this.salary = salary;
   }
   @Override
   public String toString() {
     return this.salary + "  " + this.age + "   " + this.name;
   }
   //先比较salary,高的排序在前;若相同,age小的在前
   public int compareTo(Person o) {
     int compareResult1= this.salary - o.salary;
     if(compareResult1 != 0) {
       return -compareResult1;
     } else {
       return this.age - o.age;
     }
   }
   //序列化,将NewKey转化成使用流传送的二进制
   public void write(DataOutput dataOutput) throws IOException {
     dataOutput.writeUTF(name);
     dataOutput.writeInt(age);
     dataOutput.writeInt(salary);
   }
   //使用in读字段的顺序,要与write方法中写的顺序保持一致
   public void readFields(DataInput dataInput) throws IOException {
     //read string
     this.name = dataInput.readUTF();
     this.age = dataInput.readInt();
     this.salary = dataInput.readInt();
   }

}

MapReuduce程序:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class  SecondarySort {
   public static void main(String[] args) throws Exception {
     System.setProperty("HADOOP_USER_NAME","hadoop2.7");
     Configuration configuration = new Configuration();
     //设置本地运行的mapreduce程序 jar包
     configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target\\com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
     Job job = Job.getInstance(configuration, SecondarySort.class.getSimpleName());
     FileSystem fileSystem = FileSystem.get(URI.create(args[1]), configuration);
     if (fileSystem.exists(new Path(args[1]))) {
       fileSystem.delete(new Path(args[1]), true);
     }
     FileInputFormat.setInputPaths(job, new Path(args[0]));
     job.setMapperClass(MyMap.class);
     job.setMapOutputKeyClass(Person.class);
     job.setMapOutputValueClass(NullWritable.class);
     //设置reduce的个数
     job.setNumReduceTasks(1);
     job.setReducerClass(MyReduce.class);
     job.setOutputKeyClass(Person.class);
     job.setOutputValueClass(NullWritable.class);
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     job.waitForCompletion(true);
   }
   public static class MyMap extends
       Mapper {
     //LongWritable:输入参数键类型,Text:输入参数值类型
     //Persion:输出参数键类型,NullWritable:输出参数值类型
     @Override
     //map的输出值是键值对,NullWritable说关心V的值
     protected void map(LongWritable key, Text value,
         Context context)
         throws IOException, InterruptedException {
       //LongWritable key:输入参数键值对的键,Text value:输入参数键值对的值
       //获得一行数据,输入参数的键(距首行的位置),Hadoop读取数据的时候逐行读取文本
       //fields:代表着文本一行的的数据
       String[] fields = value.toString().split(" ");
       // 本列中文本一行数据:nancy 22 8000
       String name = fields[0];
       //字符串转换成int
       int age = Integer.parseInt(fields[1]);
       int salary = Integer.parseInt(fields[2]);
       //在自定义类中进行比较
       Person person = new Person(name, age, salary);
       context.write(person, NullWritable.get());
     }
   }
   public static class MyReduce extends
       Reducer {
     @Override
     protected void reduce(Person key, Iterable values, Context context) throws IOException, InterruptedException {
       context.write(key, NullWritable.get());
     }
   }
}

运行结果:

40000  30   socrates
29000  39   white
11000  19   green
10000  19   stone
9000  22   ketty
8000  20   tom
8000  22   nancy

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


文章标题:MapReduce如何实现自定义排序-创新互联
网站链接:http://wjwzjz.com/article/jsshc.html
在线咨询
服务热线
服务热线:028-86922220
TOP