(1) 继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF。 (2)实现initialize, process, close三个方法。 UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回。最后close()方法调用,对需要清理的方法进行清理。
下面是我写的一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考
package com.it.udf;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/*
* 获取并显示每个人的平均成绩、最大分、最小分、总分
* A 90
* A 87
* B 89
* C 57
* */
public class getNewUdtf extends GenericUDTF {
Integer nTotalScore = Integer.valueOf(0); //总分
Float avgScore=Float.valueOf(0);
String forwardObj[] = new String[1];
String strStudent=""; //学生姓名
Integer count=0;
Integer max=0;
Integer min=100;
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 2) {
throw new UDFArgumentLengthException("ExplodeMap takes only two argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("ExplodeMap takes string as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("studName");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("sumScore");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("avgScore");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("maxScore");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("minScore");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
if(!strStudent.isEmpty() && !strStudent.equals(args[0].toString()))
// if(!strStudent.equals(args[0].toString()))
{
//当学生名字变化时,输出该学生的总分
ArrayList<String> arrayList = new ArrayList<String>();
avgScore=(float) (nTotalScore/count);
arrayList.add(String.valueOf(strStudent));
arrayList.add(String.valueOf(nTotalScore));
arrayList.add(String.valueOf(avgScore));
arrayList.add(String.valueOf(max));
arrayList.add(String.valueOf(min));
forward(arrayList.toArray(new String[]{}));
nTotalScore=0;
avgScore=(float) 0;
max=0;
count=0;
min=100;
}
strStudent=args[0].toString();
count++;
nTotalScore+=Integer.parseInt(args[1].toString());
//求最值
if(max<Integer.valueOf(args[1].toString())){
max=Integer.valueOf(args[1].toString());
}
if(min>Integer.valueOf(args[1].toString())){
min=Integer.valueOf(args[1].toString());
}
}
@Override
public void close() throws HiveException {
//输出最后一个学生的总分
avgScore=(float) (nTotalScore/count);
ArrayList<String> arrayList = new ArrayList<String>();
arrayList.add(String.valueOf(strStudent));
arrayList.add(String.valueOf(nTotalScore));
arrayList.add(String.valueOf(avgScore));
arrayList.add(String.valueOf(max));
arrayList.add(String.valueOf(min));
forward(arrayList.toArray(new String[]{}));
}
}
操作:
1.create external table studentscore(name string,score int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\,' stored as textfile;
2.load data local inpath '/root/jar/score.txt' overwrite into table studentscore;
3.add jar hdfs://hadoop:9000/udtftest.jar;
4.create temporary function statics as 'com.it.udf.getNewUdtf';
执行结果:
在执行期间,有可能遇到以下问题:
Error during job, obtaining debugging information...
Examining task ID: task_201601271725_0012_m_000002 (and more) from job job_201601271725_0012
Exception in thread "Thread-26" java.lang.RuntimeException: Error while reading from task log url
at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getStackTraces(TaskLogProcessor.java:240)
at org.apache.hadoop.hive.ql.exec.JobDebugger.showJobFailDebugInfo(JobDebugger.java:227)
at org.apache.hadoop.hive.ql.exec.JobDebugger.run(JobDebugger.java:92)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Server returned HTTP response code: 400 for URL: http://hadoop1:50060/tasklog?taskid=attempt_201601271725_0012_m_000000_1&start=-8193
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1436)
at java.net.URL.openStream(URL.java:1010)
at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getStackTraces(TaskLogProcessor.java:192)
... 3 more
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
MapReduce Jobs Launched:
Job 0: Map: 1 HDFS Read: 0 HDFS Write: 0 FAIL
解决过程如下:
看错误日志,url是:
http://slaver1:50060/tasklog?taskid=attempt_201112211741_0005_m_000000_1&start=-8193
再看看HADOOP的源码:TaskLogServlet
String attemptIdStr = request.getParameter("attemptid");
if (attemptIdStr == null) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Argument attemptid is required");
return;
}
所以taskid应该是attemptid ,应该是hadoop的版本不一致导致的。
然后在浏览器里访问:http://slaver1:50060/tasklog?attemptid=attempt_201112211741_0005_m_000000_1&start=-8193
获得真正的错误信息,class not found 导致的,然后就好办了,把对应的jar包扔到hadoop的lib .
stop-mapred.sh
start-mapred.sh
注意:1.jar放在本地或者hdfs 上都是可以的,若遇到空指针的问题时,可以考虑将jar包放在hadoop集群环境的主节点上,并将jar包拷贝到对应hadoop目录下的lib目录下,否则可能遇到NoClassDefFoundError问题;
2.当需要传入一个或多个参数时,在第二个阶段process时进行接收并进行逻辑处理,args[0].toString()进行参数接收;
|