一直没有发过帖子,写的不好大家将就着看,希望有人能用上;
问题描述:
公司最近一个数据清洗项目,用spark sql从一个oracle库中读取数据,对表中相关业务字段进行清洗,然后存回到另一个oracle中;读取,写入过程测试都能测通,但是在结果库中发现源库中字段类型为Date类型的字段值,写入到目标库中变为yyyy/mm/dd格式了,后面的hh24:mi:ss,不知道哪里去了...
spark用的是1.5.0版本的
查找问题:
没办法,只能调试代码了,结果发现spark在从oracle库里读取的时候,获取Date类型的值已经是yyyy/mm/dd,后面的时分秒根本没有;查找源码,发现spark sql里的DateType类型,只支持到yyyy-mm-dd,A date type, supporting "0001-01-01" through "9999-12-31". 这个是源码里的注释,可以看到确实是只到年月日。那有啥方法可以解决呢?一般数据库中所用的时间类型为Date及Timestamp,用timestamp能不能显示时分秒呢,在官网有说明了http://spark.apache.org/docs/lat ... ming-guide.html#sql
Datetime type
- TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
- DateType: Represents values comprising values of fields year, month, day.
可以看到timestamp的确可以显示到时分秒,那接下来的问题就是咋把读取到的Date类型给转换到timestamp了。
spark sql里有一个JdbcDialect 抽象类,得自己写一个oracle的方言类,扩展那个抽象类,主要实现里面3个方法就ok了,然后再打印Date类型的值,年月日时分秒都出来了,搞定。
部分代码:
public static void oracleInit() {
JdbcDialect dialect = new JdbcDialect() {
//判断是否为oracle库
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle");
}
//用于读取Oracle数据库时数据类型的转换
@Override
public Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md){
if (sqlType == Types.DATE && typeName.equals("DATE") && size == 0)
return Option.apply(DataTypes.TimestampType);
return Option.empty();
}
//用于写Oracle数据库时数据类型的转换
@Override
public Option<JdbcType> getJDBCType(DataType dt) {
if (DataTypes.StringType.sameType(dt)) {
return Option.apply(
new JdbcType("VARCHAR2(255)", Types.VARCHAR));
} else if (DataTypes.BooleanType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(1)", Types.NUMERIC));
} else if (DataTypes.IntegerType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(10)", Types.NUMERIC));
} else if (DataTypes.LongType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(19)", Types.NUMERIC));
} else if (DataTypes.DoubleType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(19,4)", Types.NUMERIC));
} else if (DataTypes.FloatType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(19,4)", Types.NUMERIC));
} else if (DataTypes.ShortType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(5)", Types.NUMERIC));
} else if (DataTypes.ByteType.sameType(dt)) {
return Option.apply(
new JdbcType("NUMBER(3)", Types.NUMERIC));
} else if (DataTypes.BinaryType.sameType(dt)) {
return Option.apply(
new JdbcType("BLOB", Types.BLOB));
} else if (DataTypes.TimestampType.sameType(dt)) {
return Option.apply(
new JdbcType("DATE", Types.DATE));
} else if (DataTypes.DateType.sameType(dt)) {
return Option.apply(
new JdbcType("DATE", Types.DATE));
} else if (DataTypes.createDecimalType()
.sameType(dt)) { //unlimited
/* return DecimalType.Fixed(precision, scale)
=>Some(JdbcType("NUMBER(" + precision + "," + scale + ")",
java.sql.Types.NUMERIC))*/
return Option.apply(
new JdbcType("NUMBER(38,4)", Types.NUMERIC));
}
return Option.empty();
}
};
//注册此方言
JdbcDialects.registerDialect(dialect);
// JdbcDialects.unregisterDialect(dialect);
}
SparkConf conf = new SparkConf();
conf.setAppName("Test");
//设置启动方式为本地方式
conf.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
oracleInit();
System.out.println("================"+sqlContext.getSQLDialect());
Map<String, String> options = new HashMap<String, String>();
options.put("user", "whst");
options.put("password", "123456");
options.put("url", url);
options.put("dbtable", "WHST_EXAMPLE");
//一次取多少行数据
options.put("fetchSize", "20");
options.put("driver", driver);
DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
jdbcDF.show(false);
打印结果:
+---------------------+---------+-------+------+----------+--------+--------------+----------+
|STARTDATE |STATIONID|POINTID|ITEMID|SAMPLERATE|OBSVALUE|PROCESSINGFLAG|DATE_INDEX|
+---------------------+---------+-------+------+----------+--------+--------------+----------+
|2015-01-03 14:15:21.0|1000 |2 |4112 |01 |null |0 |1000 |
|2015-01-04 15:15:21.0|1000 |2 |4112 |01 |null |0 |1000 |
|2015-01-02 12:15:21.0|1000 |2 |4112 |01 |null |0 |1000 |
|2015-01-01 13:15:21.0|1000 |2 |4112 |01 |null |0 |1000 |
+---------------------+---------+-------+------+----------+--------+--------------+----------+
|
|