问题1:Mapreduce中,使用默认TextInputFormat时,数据是按照每行读取的,这里key是默认key,value是我自定义的Trans对象;我将所有数据处理都写在了一个map中,Map中的功能被分为4块分别被写入了calculateSup(trans),calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)中,但是现在出现一个问题,当进入第一个calculateSup(trans)函数之后,hadoop就会一直向下读取key—value,从而还没进入后面调用的calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)方法,就已经把所有数据读取完了,如果我要达到一条记录分别被calculateSup(trans),calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)这些方法依次处理后在依次处理下一条方法的话,需要如何设置?
问题2:请问如何能简便快捷的对Mapreduce程序进程调试,我现在的处理方法是在程序中打印控制台输出语句System.out.println(),之后将程序打包为jar包上传到Namenode节点运行之后,输出日志进行调试,还有其他更方面的调试方法吗?
/**
* 匹配优惠政策的MAPPER 1.在Mapper启动时加载基础资料到内存 2.读取流水,匹配优惠政策,计算收支:
* .读取一条流水,转换交易流水,流水无效则写入错误文件,
*/
public class MatchPPMapper extends Mapper<Object, Text, Text, GBKText> {
private static Logger logger = LoggerFactory.getLogger(MatchPPMapper.class);
// debug开关
public static boolean flag = false;
public static boolean flag2 = false;
public static boolean iflag = false;
private ArrayList<SupCommision> supList = new ArrayList<SupCommision>();// 优惠规则列表
private ArrayList<GoodsPrice> gpriceList = new ArrayList<GoodsPrice>();
private ArrayList<ClientDiscount> clientList = new ArrayList<ClientDiscount>();
private ArrayList<TieredData> tieredDataList = new ArrayList<TieredData>();
// 新增列表:1、客户优惠支出列表,2、客户达量支出列表,3、客户积分匹配规则列表,4、客户会员列表 5、客户达量信息列表
private ArrayList<Member> memberList = new ArrayList<Member>();
private ArrayList<Integral> integralList = new ArrayList<Integral>();
private ArrayList<CustomPreferential> cPreferList = new ArrayList<CustomPreferential>();
private ArrayList<GradedPreferential> gPreferList = new ArrayList<GradedPreferential>();
private ArrayList<GradedData> gradedDataList = new ArrayList<GradedData>();
private Text outPutKey = new Text();
private GBKText outPutValue = new GBKText();
private PolicyMatch sm = new PolicyMatch();
private Calculate cal = new Calculate();
private String randomStr = "0"; // 在Mapper运行前获取一个随机数每个map输出不同的文件名,如果不设置,后面的map会把前面的输出覆盖。
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// 获得输入文件的路径名
randomStr = UUID.randomUUID().toString();
BufferedReader in = null;
logger.warn("Map setup....................");
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
if (flag && flag2) {
for (Path p : paths) {
System.out.println("##文件路径是: " + p.toString());
}
}
String value = null;
if (flag && flag2) {
System.out.println("正在导入原始数据!!");
}
for (Path path : paths) {
if (flag && flag2) {
System.out.println("路径 path: " + path.toString());
}
if (path.toString().contains("sup_commission.txt")) {
SupCommision sup = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
while (null != (value = in.readLine())) {
sup = new SupCommision();
if (flag && flag2) {
// System.out.println("读取行数据"+value);
}
if (sup.parse(value)) {
supList.add(sup);
}
}
if (flag && flag2) {
System.out.println("SupNum:" + supList.size());
}
} else if (path.toString().contains("goods_price.txt")) {
GoodsPrice gp = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
while (null != (value = in.readLine())) {
gp = new GoodsPrice();
if(flag){
System.out.println("读取行数据 " + value);
}
if (gp.parse(value)) {
gpriceList.add(gp);
}
}
if(flag){
System.out.println("GoodsNum:——" + gpriceList.size());
}
}
else if (path.toString().contains("tieredData.txt")) {
System.out.println("开始导入供应商达量数据");
TieredData td = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
while (null != (value = in.readLine())) {
td = new TieredData();
if(flag){
System.out.println("读取行数据" + value);
}
if (td.parse(value)) {
tieredDataList.add(td);
}
}
if(flag){
System.out.println("tieredDataNum:" + tieredDataList.size());
}
}
// 新录入,优惠支出
else if (path.toString().contains("custom_preferential.txt")) {
CustomPreferential cPrefer = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
while (null != (value = in.readLine())) {
cPrefer = new CustomPreferential();
// System.out.println("读取行数据"+value);
if (cPrefer.parse(value)) {
cPreferList.add(cPrefer);
if(flag){
System.out.println("导入非达量优惠政策!!!!! 读入数据个数"
+ cPreferList.size());
}
}
}
if(flag){
System.out.println("custom_preferentialNum:"
+ cPreferList.size());
}
}
// 新录入,客户优惠梯度基本信息
else if (path.toString().contains("graded_preferential.txt")) {
GradedPreferential gPrefer = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
// if(flag){
// System.out.println(" 导入客户达量优惠数据 ,value="+value);
// }
while (null != (value = in.readLine())) {
gPrefer = new GradedPreferential();
if (gPrefer.parse(value)) {
gPreferList.add(gPrefer);
}
}
if(flag){
System.out.println("graded_preferentialNum:"
+ gPreferList.size());
}
}
// 新录入,会员基本信息
else if (path.toString().contains("member_info.txt")) {
Member member = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
if(flag||iflag){
System.out.println(" 导入会员信息,value="+value);
}
while (null != (value = in.readLine())) {
member = new Member();
if (member.parse(value)) {
memberList.add(member);
}
}
}
// 新录入,积分信息
else if (path.toString().contains("Integral_info.txt")) {
Integral integral = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
if(flag||iflag){
System.out.println(" 导入积分规则信息,value="+value);
}
while (null != (value = in.readLine())) {
integral = new Integral();
if (integral.parse(value)) {
integralList.add(integral);
}
}
}
// 新录入,达量匹配规则信息
else if (path.toString().contains("Graded_info.txt")) {
GradedData gradedData = null;
in = new BufferedReader(new InputStreamReader(
new FileInputStream(path.toString().replace(
"file:", "")), "GBK"));
while (null != (value = in.readLine())) {
gradedData = new GradedData();
if (gradedData.parse(value)) {
gradedDataList.add(gradedData);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
if (value == null || value.toString().equals("")) {
System.out.println("Value is NULL!! Value= " + value);
return;
}
String valueStr = new String(value.getBytes(), 0, value.getLength(),
"GBK");
Trans tran = new Trans();
if (!tran.parse(valueStr)) { //这里parse方法将录入的数据,导入到tran对象中
// System.out.println("第 "+tran.i+" 条流水信息!!!");
System.out.println("Error: " + tran.toString());
outPutKey.set("error_" + key.toString());
outPutValue.set(valueStr);
context.write(outPutKey, outPutValue);
return;
}
// 计算供应商佣金
boolean hasSupTier = calculateSup(tran);
// 计算客户非达量优惠支出
boolean preferNum = calculatePrefer(tran); //就是这例如进入非达量优惠模块后,就会将所有数据一次性进行处理了,从而进入达量优惠处理模块时就会由于tran未空从而跳出
// 计算达量优惠支出
boolean gPreferNum = calculateGradedPrefer(tran);
// 计算会员积分
boolean integralNum = calculateIntegral(tran);
if(!hasSupTier&&!preferNum&&!gPreferNum&&!integralNum){
outPutKey.set(randomStr);
outPutValue.set(tran.toString());
context.write(outPutKey, outPutValue);
return;
}
}// map
|