分享

spark 读取oracle,字段类型为Date的处理

YLV 发表于 2016-5-4 17:17:31 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 16 61141
一直没有发过帖子,写的不好大家将就着看,希望有人能用上;
问题描述:
       公司最近一个数据清洗项目,用spark sql从一个oracle库中读取数据,对表中相关业务字段进行清洗,然后存回到另一个oracle中;读取,写入过程测试都能测通,但是在结果库中发现源库中字段类型为Date类型的字段值,写入到目标库中变为yyyy/mm/dd格式了,后面的hh24:mi:ss,不知道哪里去了...
       spark用的是1.5.0版本的

查找问题:
       没办法,只能调试代码了,结果发现spark在从oracle库里读取的时候,获取Date类型的值已经是yyyy/mm/dd,后面的时分秒根本没有;查找源码,发现spark sql里的DateType类型,只支持到yyyy-mm-dd,A date type, supporting "0001-01-01" through "9999-12-31".   这个是源码里的注释,可以看到确实是只到年月日。那有啥方法可以解决呢?一般数据库中所用的时间类型为Date及Timestamp,用timestamp能不能显示时分秒呢,在官网有说明了http://spark.apache.org/docs/lat ... ming-guide.html#sql
  Datetime type
  • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
  • DateType: Represents values comprising values of fields year, month, day.
  可以看到timestamp的确可以显示到时分秒,那接下来的问题就是咋把读取到的Date类型给转换到timestamp了。
  spark sql里有一个JdbcDialect  抽象类,得自己写一个oracle的方言类,扩展那个抽象类,主要实现里面3个方法就ok了,然后再打印Date类型的值,年月日时分秒都出来了,搞定。


部分代码:
    public static void oracleInit() {
        JdbcDialect dialect = new JdbcDialect() {

            //判断是否为oracle
            @Override
            public boolean canHandle(String url) {
                return url.startsWith("jdbc:oracle");
            }

            //用于读取Oracle数据库时数据类型的转换
            @Override
            public Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md){
                if (sqlType == Types.DATE && typeName.equals("DATE") && size == 0)
                    return Option.apply(DataTypes.TimestampType);
                return Option.empty();
            }

            //用于写Oracle数据库时数据类型的转换
            @Override
            public Option<JdbcType> getJDBCType(DataType dt) {
                if (DataTypes.StringType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("VARCHAR2(255)", Types.VARCHAR));
                } else if (DataTypes.BooleanType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(1)", Types.NUMERIC));
                } else if (DataTypes.IntegerType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(10)", Types.NUMERIC));
                } else if (DataTypes.LongType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(19)", Types.NUMERIC));
                } else if (DataTypes.DoubleType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(19,4)", Types.NUMERIC));
                } else if (DataTypes.FloatType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(19,4)", Types.NUMERIC));
                } else if (DataTypes.ShortType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(5)", Types.NUMERIC));
                } else if (DataTypes.ByteType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("NUMBER(3)", Types.NUMERIC));
                } else if (DataTypes.BinaryType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("BLOB", Types.BLOB));
                } else if (DataTypes.TimestampType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("DATE", Types.DATE));
                } else if (DataTypes.DateType.sameType(dt)) {
                    return Option.apply(
                            new JdbcType("DATE", Types.DATE));
                } else if (DataTypes.createDecimalType()
                        .sameType(dt)) { //unlimited
/*                    return DecimalType.Fixed(precision, scale)
                            =>Some(JdbcType("NUMBER(" + precision + "," + scale + ")",
                            java.sql.Types.NUMERIC))*/
                    return Option.apply(
                            new JdbcType("NUMBER(38,4)", Types.NUMERIC));
                }
                return Option.empty();
            }
        };
        //注册此方言
         JdbcDialects.registerDialect(dialect);
//        JdbcDialects.unregisterDialect(dialect);
    }


SparkConf conf = new SparkConf();
conf.setAppName("Test");
//设置启动方式为本地方式
conf.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);
oracleInit();
System.out.println("================"+sqlContext.getSQLDialect());

Map<String, String> options = new HashMap<String, String>();
options.put("user", "whst");
options.put("password", "123456");
options.put("url", url);
options.put("dbtable", "WHST_EXAMPLE");
//一次取多少行数据
options.put("fetchSize", "20");
options.put("driver", driver);
DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
jdbcDF.show(false);


打印结果:
+---------------------+---------+-------+------+----------+--------+--------------+----------+
|STARTDATE            |STATIONID|POINTID|ITEMID|SAMPLERATE|OBSVALUE|PROCESSINGFLAG|DATE_INDEX|
+---------------------+---------+-------+------+----------+--------+--------------+----------+
|2015-01-03 14:15:21.0|1000     |2      |4112  |01        |null    |0             |1000      |
|2015-01-04 15:15:21.0|1000     |2      |4112  |01        |null    |0             |1000      |
|2015-01-02 12:15:21.0|1000     |2      |4112  |01        |null    |0             |1000      |
|2015-01-01 13:15:21.0|1000     |2      |4112  |01        |null    |0             |1000      |
+---------------------+---------+-------+------+----------+--------+--------------+----------+








已有(16)人评论

跳转到指定楼层
langke93 发表于 2016-5-4 17:48:35
我顶,不错的帖子,感谢分享
回复

使用道具 举报

lihy114 发表于 2016-6-1 10:38:15
楼主,有没有比较简单的spark 读取oracle数据库数据的代码实例,Java实现的,找了半天没找到,多谢
回复

使用道具 举报

lihy114 发表于 2016-6-1 14:18:02
package com.dt.spark.SparkApps.cores;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class ReadOracle {
        public static void main(String[] args) {
                SparkConf conf =new SparkConf().setAppName("Spark WordCount written by java").setMaster("local");
                JavaSparkContext sc=new JavaSparkContext(conf); //其底层就是scala的sparkcontext
               
                SQLContext sqlContext = new SQLContext(sc);
               
                String url = "jdbc:oracle:thin:@//192.168.3.218:1521/orcl";
                Map<String, String> options = new HashMap<String, String>();
                options.put("user", "wlsj");
                options.put("password", "wlsj");
                options.put("url", url);
                options.put("dbtable", "test1");
                //一次取多少行数据
                options.put("fetchSize", "20");
//                options.put("driver", driver);
                DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
               
                List<Row> teenagerNames = jdbcDF.collectAsList();
               
                for(Row s : teenagerNames) {
                        System.out.println(s);
                }
        }

}


比较简单的一个,能够读出数据
回复

使用道具 举报

YLV 发表于 2016-6-2 15:09:39
lihy114 发表于 2016-6-1 10:38
楼主,有没有比较简单的spark 读取oracle数据库数据的代码实例,Java实现的,找了半天没找到,多谢

4楼已经给出了,可以
回复

使用道具 举报

lihy114 发表于 2016-6-2 15:48:25
YLV 发表于 2016-6-2 15:09
4楼已经给出了,可以

楼主,4楼的是我自己写的;这些代码在eclipse中可以正常运行,但是打成jar包放到了spark环境中就不行了,报的错误是

16/06/02 01:02:14 ERROR spark.SparkContext: Error initializing SparkContext.
java.net.ConnectException: Call From node243/192.168.3.243 to systex:8020 failed on connection exception: java.net.ConnectException: 拒绝连接; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45

这是什么问题啊,我感觉不是代码问题;是不是跟我打包的jar包有关系,打jar的时候没有将eclipse中用到的spark相关的驱动包放进去

回复

使用道具 举报

YLV 发表于 2016-6-8 08:11:01
lihy114 发表于 2016-6-2 15:48
楼主,4楼的是我自己写的;这些代码在eclipse中可以正常运行,但是打成jar包放到了spark环境中就不行了, ...

spark的驱动不打进包也可以,但你运行的时候得设置些环境变量,保证你的app能读到相关的jar;这种多半情况都是你的spark环境问题;
不知道你最终解决了没,是什么问题呢
回复

使用道具 举报

lihy114 发表于 2016-6-8 08:54:25
YLV 发表于 2016-6-8 08:11
spark的驱动不打进包也可以,但你运行的时候得设置些环境变量,保证你的app能读到相关的jar;这种多半情 ...

非常感谢楼主,我已经解决了,确实是自己的spark集群的配置问题,在spark-default.conf文件的中一个参数spark.eventLog.dir设置的问题,修改后就可以跑通的

设置的时候照抄了网上的一个参数设置
回复

使用道具 举报

lihy114 发表于 2016-6-8 10:29:11
楼主,我想问一下,你们这种将oracle数据库做为数据源的应用场景,spark在进行计算处理的时候,会在多个节点上进行处理吗?还是只能在主节点进行数据处理
回复

使用道具 举报

YLV 发表于 2016-6-8 10:43:28
lihy114 发表于 2016-6-8 10:29
楼主,我想问一下,你们这种将oracle数据库做为数据源的应用场景,spark在进行计算处理的时候,会在多个节 ...

spark处理肯定不会再master节点上处理,都是在worker节点处理;除非你是local模式;而客户端模式,dirver程序会在你启动的机器上启动;这主要取决于你的app启动方式了
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条