[mw_shl_code=java,true]SparkConf conf = new SparkConf();
conf.setAppName("dataAnalysis");
conf.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
String testFileSrc = "/dataAnalysis/testData/testTopTen.csv";
JavaRDD<FanDataVO> Data = sc.textFile(testFileSrc).map(new Function<String, FanDataVO>() {
private static final long serialVersionUID = 1L;
@Override
public FanDataVO call(String row) throws Exception {
String[] rowArr = row.split(",");
if(rowArr[0].equals("Time")) return new FanDataVO();
FanDataVO vo = new FanDataVO();
vo.setTime(rowArr[0]);
vo.setWt_WsAvg(rowArr[7]);
vo.setWt_PowerAvg(rowArr[20]);
vo.setRs(rowArr[8]);
vo.setMast_T(rowArr[19]);
vo.setDate_Begin(rowArr[0]);
vo.setDate_End(rowArr[0]);
return vo;
}
});
JavaPairRDD<String, FanDataVO> monthPair = Data.mapToPair(new PairFunction<FanDataVO, String, FanDataVO>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, FanDataVO> call(FanDataVO vo)
throws Exception {
String month = DateUtil.getMonthByDays(vo.getTime());
return new Tuple2<String, FanDataVO>(month, vo);
}
});
monthPair = monthPair.reduceByKey(new Function2<FanDataVO, FanDataVO, FanDataVO>() {
@Override
public FanDataVO call(FanDataVO oldVO, FanDataVO newVO) throws Exception {
if (StringUtils.isNotBlank(newVO.getTime())) {
if (StringUtils.isBlank(oldVO.getDate_Begin())) {
oldVO.setDate_Begin(oldVO.getTime());
} else {
// 每月的最小时间
if (Double.parseDouble(oldVO.getDate_Begin()) > Double
.parseDouble(newVO.getTime())) {
oldVO.setDate_Begin(newVO.getTime());
}
}
if (StringUtils.isBlank(oldVO.getDate_End())) {
oldVO.setDate_End(oldVO.getTime());
} else {
// 每月的最大时间
if (Double.parseDouble(oldVO.getDate_End()) < Double
.parseDouble(newVO.getTime())) {
oldVO.setDate_End(newVO.getTime());
}
}
}
return oldVO;
}
});[/mw_shl_code]
|