一个简单的mapreduce程序全排列

全排序其实就是全局排序,就是使得所有数据按序排列输出,和我们平常做的给一个数组排序没有什么区别,唯一的区别就是数据量的不同,这里涉及的数据量是TB级别的,这就意味着不可能简单地把数据加载进内存进行排序,需要用到分布式计算,所以就产生了Hadoop的全排序,Hadoop的全排序在实际应用有着重要的作用。

1)准备数据(数据已经进行过分组聚合操作):
一个简单的mapreduce程序全排列
2)封装文件各字段:

package com.Sort.Whole;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
private Float upFlow; //NA_Sales字段
private Float downFlow; //Other_Sales字段
private Float sumFlow; //sum_Sales字段

public FlowBean() {
super();
}

public FlowBean(Float upFlow, Float downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

**//比较,以sumFlow进行从大到小排序**
public int compareTo(FlowBean flowBean) {
int result;
if (sumFlow > flowBean.getSumFlow()){
result = -1;
}else if(sumFlow < flowBean.getSumFlow()){
result = 1;
}else{
result = 0;
}
return result;
}

//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeFloat(upFlow);
dataOutput.writeFloat(downFlow);
dataOutput.writeFloat(sumFlow);
}

//反序列
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readFloat();
downFlow = dataInput.readFloat();
sumFlow = dataInput.readFloat();
}

public Float getUpFlow() {
return upFlow;
}

public void setUpFlow(Float upFlow) {
this.upFlow = upFlow;
}

public Float getDownFlow() {
return downFlow;
}

public void setDownFlow(Float downFlow) {
this.downFlow = downFlow;
}

public Float getSumFlow() {
return sumFlow;
}

public void setSumFlow(Float sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + sumFlow
;
}
}

3) Map程序,我们mapreduce程序以key进行排序(如果不指定排序规则,则以字典作为排序规则),那我们可以将之前的value作为key,key作为value来进行排序,那么该键值对就是Map程序的输出,Reduce程序的输入;

package com.Sort.Whole;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMappper extends Mapper<LongWritable, Text,FlowBean,Text> {
**//LongWritable是为偏移量,Text是每一行的内容,FlowBean封装upFlow,downFlow,sumFlow三个字段(第二字段开始到第四个字段),Text是每一行的第一个字段**
FlowBean k = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Action 877.83 1751.18 2629.01

//1. 获取一行
String line = value.toString();

//2. 切割
String[] fields = line.split(",");

//3.封装对象
String Genre = fields[0];
Float upFlow = Float.parseFloat(fields[fields.length-3]);
Float downFlow = Float.parseFloat(fields[fields.length-2]);
Float sumFlow = Float.parseFloat(fields[fields.length-1]);

k.setDownFlow(downFlow);
k.setUpFlow(upFlow);
k.setSumFlow(sumFlow);

v.set(Genre);

//4.写出
context.write(k,v);

}
}

    Reduce程序,它的输入既是Map程序的输出,它的输出key应是源文件的第一个字段,value是源文件中的其他字段
package com.Sort.Whole;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

//Action 877.83 1751.18 2629.01

for (Text value : values) {
context.write(value, key);
}
}
}
**5)主程序: **

```java
package com.Sort.Whole;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//这里请注意:Workerhdfs目录下不能有output4文件
args = new String[]{"E:\\谷歌文件\\谷歌数据集\\","F:\\scala\\Workerhdfs\\output4"};
//1.获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar路径
job.setJarByClass(FlowCountSortDriver.class);

//3.关联mapper和reducer
job.setMapperClass(FlowCountSortMappper.class);
job.setReducerClass(FlowCountSortReducer.class);
//4 设置mapper输出的key和value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

//5. 设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

//6.设置输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

运行结果如下:
一个简单的mapreduce程序全排列
part-r-00000文件内容,第四字段按照从大到小排列,简单的排列成功

原创:https://www.panoramacn.com
源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

一个简单的mapreduce程序全排列

免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
未经允许不得转载:书荒源码源码网每日更新网站源码模板! » 一个简单的mapreduce程序全排列
关注我们小说电影免费看
关注我们,获取更多的全网素材资源,有趣有料!
120000+人已关注
分享到:
赞(0) 打赏

评论抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

您的打赏就是我分享的动力!

支付宝扫一扫打赏

微信扫一扫打赏