MapReduce 二次排序和全排序
本帖最后由 javaanddonet 于 2018-3-30 16:20 编辑最近练习MR的二次排序和全排序。
先看输入文件:
aa 99
bb 98
cc 97
dd 96
aa 80
bb 70
cc 60
dd 77
aa 89
bb 91
cc 79
dd 83
aa 93
bb 94
cc 66
dd 86
3 60
4 22
5 10
6 50
3 81
4 75
5 67
6 27
3 19
4 51
5 69
6 33
3 43
4 64
5 46
6 36
我想让结果先按第一列升序排列,然后在这个基础上,在按照第二列降序排列,然后在取每一组的前两名。
我先考虑使用Map方法将所有key相同的值,聚集为<key,<v2,v4,v1,v3>>这种格式,然后在reduce阶段,对map的输出的value,转换为ArrayList<Int>数组,然后是JDK的Collections.sort(arrayList);进行排序。这样可以实现但是如果数据量很大,估计这个性能是个问题。
然后我查阅资料,需要自己定义一个map的输出类型,将输入文件的每一行值,当作一个key,然后把每一行第二列的值,当作一个value来交个map方法处理。Map的输出的KV是这种类型的:<<k1,v1,>, v1>;<<k2,v2,>, v2>。这就要自己去定义一个数据类型为KV对的数据类如下:然后在Map方法中,将这个数据类型提供给Map函数来使用,当作Map函数的输出的key的数据类型。我的问题来了
第一个问题:下面的这个自定义的数据类型中,我已经重写了compareTo方法,我发现这样已经ok了。已经实现类二次排序的功能。那为何我看网上很多人还要设置下面的这两个属性?并且在自己定义的实现类中重写了compare或compareTo方法。我感觉在自定义的数据类型中实现了两个字段的排序,这里不需要再次调用设置了。否则需要在这里进行重新配置实现。
job.setGroupingComparatorClass()//设置自定义分组的实现,当然这里会设置自己的实现类
job.setSortComparatorClass()//设置自定义排序的实现,当然这里会设置自己的实现类
第二个问题:不用设置reduce的实现,map任务完全可以完成二次排序了。问题网上很多人还要设置下面的reduce的实现?
job.setReducerClass(MyReduce.class)
第三个问题:设置分区的时候,我发现一个问题,如果设置了reduce的个数为1,那么即便是自定义了分区的实现,好像也不会调用实现类。因为我在自定义的分区中输出了一句话,真个MR运行完了,我发现没有输出我要输出的内容。而当我将reduce的个数设置2的时候,就会调用定义的分区实现类。也输出了我想输出内容。我能否这么认为:如果setNumReduceTasks设置为1,那么就不用自定义分区实现类了。因为即便是设置了,也不会调用。
job.setPartitionerClass(MyPartitionerClass.class);//设置分区
job.setNumReduceTasks(1)//设置reduce的个数
第四个问题:我想取排序后的每一组的前两名,怎么做?在reduce的迭代循环中,使用循环判定吗?如下代码中添加判断超过2就return不输出了?有没有类似于saprk中take(2)质量的函数?text.set(reduceKeyInPairKeyWritable.getFirst());
for(IntWritable valueIntWritable : reduceValueInIterable){
context.write(text, valueIntWritable);
}
//需要实现数据的序列化与反序列化,这样才能在多个节点之间传输数据!
//自定义组合数据类型,用与map和reduce的输入输出使用。
public class PairKeyWritable implements WritableComparable<PairKeyWritable>{
private String first;
private int second;
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public void set(String first, int second){
this.setFirst(first);
this.setSecond(second);
}
public PairKeyWritable(String first, int second) {
this.set(first, second);
}
// 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public PairKeyWritable() { }
//用于输出结果以tab键隔开
@Override
public String toString() {
// return this.getFirst() + "\t" + this.getSecond();//可以自己灵活定义对象toString的输出的方式
// return this.getFirst() + "\t";
return this.getFirst();
}
//该方法需要重写,因为在根据可以进行分区的时候,会使用到该方法如下所示,如果job设置了分区方法,则需要调用这个方法。
// /** Use {@link Object#hashCode()} to partition. */
// public int getPartition(K key, V value,
// int numReduceTasks) {
// return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
// }
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + second;
return result;
}
//至于为何要重写hashCode和equals方法,看看这个帖子:hashcode方法的作用
//https://blog.csdn.net/anmoyyh/article/details/76019777
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairKeyWritable other = (PairKeyWritable) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second != other.second)
return false;
return true;
}
//注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.first);
out.writeInt(this.second);
}
//注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
//比较排序,先按照第一个字段排序,如果第一个自动排序完成后,再按照第二个字段排序。
@Override
public int compareTo(PairKeyWritable o) {
PairKeyWritable pairKeyWritable = (PairKeyWritable) o;
int firstCompareResult = this.getFirst().compareTo(pairKeyWritable.getFirst());
//如果比较结果大于0,返回1;如果小于0,返回-1;等于0,返回0。
//如果直接返回第一个字段比较结果,而不是用下面的if...else分别返回,那么最终的结果只有第一个字段是排序的,第二个字段是无序的。
// return firstCompareResult > 0 ? 1 : firstCompareResult < 0 ? -1 : 0;
if(firstCompareResult != 0){//如果比较结果不等0,表示两个值不相等,那么直接返回该比较结果,否则使用第二个字段进行比较。
return firstCompareResult > 0 ? 1 : -1;//升序
// return firstCompareResult > 0 ? -1 : 1;//降序
}else{
int secondCompareResult = this.getSecond() - pairKeyWritable.getSecond();//如果比较结果大于0,返回1; 如果小于0,返回-1; 如果等于0,返回0,表示两个要比较的值相等。
// return secondCompareResult > 0 ? 1 : secondCompareResult < 0 ? -1 : 0;//升序
return secondCompareResult > 0 ? -1 : secondCompareResult < 0 ? 1 : 0;//降序
}
}
}
Map函数:
public static class MyMap extends Mapper<LongWritable, Text, PairKeyWritable, IntWritable>{
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
PairKeyWritable pairKeyWritable = new PairKeyWritable();
String first = null;
int second = 0;
String[] lineStringArray = null;
String line = null;
// MapReduce框架每读一行数据就调用一次map方法
@Override
public void map(LongWritable mapKeyInLongWritable, Text mapValueInText, Context context) throws IOException, InterruptedException {
System.out.println("MyMap.........");
line = mapValueInText.toString();
lineStringArray = line.split(" ");
first = lineStringArray;
second = Integer.parseInt(lineStringArray);
pairKeyWritable.set(first, second);
context.write(pairKeyWritable, new IntWritable(second));
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
}
Reduce函数:
public static class MyReduce extends Reducer<PairKeyWritable, IntWritable, Text, IntWritable>{
@Override
protected void setup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
private Text text = new Text();
@Override
protected void reduce(PairKeyWritable reduceKeyInPairKeyWritable, Iterable<IntWritable> reduceValueInIterable, Context context) throws IOException, InterruptedException {
System.out.println("MyReduce====================");
//下面的for循环也可以正常显示已结果,这个更简洁。
// context.write(new Text(reduceKeyInPairKeyWritable.getFirst()), new IntWritable(reduceKeyInPairKeyWritable.getSecond()));
text.set(reduceKeyInPairKeyWritable.getFirst());
for(IntWritable valueIntWritable : reduceValueInIterable){
context.write(text, valueIntWritable);
}
}
@Override
protected void cleanup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
}
}
分区函数:问题?如果reduce个数设置1,自定义的分区不会调用吗?
public static class MyPartitionerClass extends Partitioner<PairKeyWritable, IntWritable>{
@Override
public int getPartition(PairKeyWritable mapKeyOut, IntWritable mapValueOut, int numPartitions) {
System.out.println("MyPartitionerClass====================");
System.out.println("numPartitions="+numPartitions);
//根据key的hash code进行分区。如果很大数据量需要考虑抽样分区,然后才能避免出现数据倾斜和访问热点的问题
System.out.println("mapKeyOut.getFirst()="+mapKeyOut.getFirst());
System.out.println("mapKeyOut.getFirst().hashCode()="+mapKeyOut.getFirst().hashCode());
//与reduce job的个数进相除然后取余数。
int partitionResult = (mapKeyOut.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
System.out.println("partitionResult="+partitionResult);
System.out.println("");
return partitionResult;
}
}
分组函数:这里有一个问题,见里面的注释。我在里面将Object转换一下不可以吗?
public static class MyGroupingComparatorClass extends WritableComparator{
//为何要这个方法?//必须要调用父类的构造器
protected MyGroupingComparatorClass() {
super(PairKeyWritable.class, true);//注册comparator
}
// public int compare(Object o1, Object o2) {
//为何上面的方法不行?必须要指定为:WritableComparable
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
System.out.println("MyGroupingComparatorClass&&&&&&&&&&&&&&&&&&&&&&&&");
PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
int minus = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
return minus;
}
}
排序函数:
public static class MySortComparatorClass extends WritableComparator{
//为何要这个方法?//必须要调用父类的构造器
protected MySortComparatorClass() {
super(PairKeyWritable.class, true);//注册comparator
}
// public int compare(Object o1, Object o2) {
//为何上面的方法不行?必须要指定为:WritableComparable
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
System.out.println("MySortComparatorClass###########################");
PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
int firstComapreResult = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
//分组内部进行排序,按照第二个字段进行排序,首先要保证是同一个组内,同一个组的标识就是第一个字段相同
if (firstComapreResult != 0){
return firstComapreResult > 0 ? 1 : -1;
} else {
int secondComapreResult = doubleSortBean_1.getSecond() - doubleSortBean_2.getSecond();
return secondComapreResult > 0 ? -1 : secondComapreResult < 0 ? 1 : 0;
}
}
}
接下来是全排序的问题。我在使用InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);进行全排序的时候,为何看不到随机抽样的文件在HDFS上面,全局排序的设置如下:
// 设置partition file全路径到conf
Path path = new Path(args);//随机抽样文件存放位置
TotalOrderPartitioner.setPartitionFile(configration, path);
// partitioner class设置成TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
// RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);
// 写partition file到mapreduce.totalorderpartitioner.path
InputSampler.writePartitionFile(job, sampler);
使用RandomSampler来做全局排序的时候,有一点比较恶心:要求Map的输入key和输出key的数据类型必须一致?这个太恶心了。我们还得自己重新定义InputFormat实现类。因为我的map输出的key是自定义的数据类型,而Map的输入key是使用的LongWritable类型的。除了自定义InputForMat实现类,还有其他的办法吗?
有大神指导一下吗? 非要用MR实现吗??? fly2015 发表于 2018-4-2 14:45
非要用MR实现吗???
不一定是要用MR。我只不过想深入研究一下。
我这两天研究了,发现这个MR 能写死人的。
转为hive里面的表,然后用HQL很简单了。
再说,好像没有公司用MR了吧。
页:
[1]