首先我都是在单机上运行的,其次我的程序是这样的:
hadoop程序:
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text>
{
private Text word = new Text();
private Text val = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
String aa=value.toString();
String []b=aa.split("\t");
if((b.length>10)&&(b[1].length()>9)&&(b[3].length()>6)&&(b[9].length()>17)){
word.set(b[3]);// che pai hao
val.set(b[1]+"="+b[9])
context.write(word, val);
count++;
}
}
}
public static class IntSumReducer extends
Reducer<Text, Text, Text, Text>
{
private Text word = new Text();
private Text outvals = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException
{
count1++;
Iterator val1 =values.iterator();
int sum = 0,k=7;// k wei jian ce dian yu zhi
String temp="";
List<String> temp1=new ArrayList<String>();
for (Text val : values){
sum++;
temp1.add(val.toString());
temp+=val.toString()+";";
}
if(sum>k){
for(int i=0;i<temp1.size();i++){
count2++;
if(temp1.get(i).length()<30)
continue;
word.set(temp1.get(i).substring(0,11));
outvals.set(temp1.get(i).substring(12,temp1.get(i).length())+"="+key.toString());
context.write(word, outvals);
}
}
}
}
将其转为spark程序是这样的:
JavaSparkContext jsc = new JavaSparkContext(sparkconf);
JavaPairRDD<String, String> mapreduce1 = jsc
.textFile("D://1101.1")
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
String[] str = s.split("\t");
if (str.length > 10 && str[1].length() > 9
&& str[3].length() > 6 && str[9].length() > 17) {
return true;
}
return false;
}
})
.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s)
throws Exception {
String[] str = s.split("\t");
return new Tuple2<String, String>(str[3], str[1] + "="
+ str[9]);
}
});
long count = mapreduce1.count();
System.out.println(count);
JavaPairRDD<String, Iterable<String>> mapreduce2=mapreduce1.groupByKey();
long count1 = mapreduce2.count();
System.out.println(count1);
mapreduce2=mapreduce2.filter(new Function<Tuple2<String, Iterable<String>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
int sum = 0, k = 7;
for (String val : tuple._2()) {
sum++;
}
if (sum > k) {
return true;
}
return false;
}
});
long count2 = mapreduce2.count();
System.out.println(count2);
JavaRDD<String> mapreduce3 = mapreduce2.map(new Function<Tuple2<String,Iterable<String>>, String>() {
@Override
public String call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String str = "";
for (String val : tuple._2()) {
str = val.substring(0, 11)+" "+val.substring(12, val.length())+ "=" + tuple._1().toString();
}
return str;
}
});
long count3 = mapreduce3.count();
System.out.println(count3);
mapreduce3.saveAsTextFile("D://output//temp1");
count1,2,3……等是为了比较处理的数据量,最后hadoop程序跟spark程序得出的结果不一致,数据量不在同一个级别上,按理说应该结果一样才对呀。
|