分享

企业级数据仓库构建(七):搭建DWD 层

本帖最后由 levycui 于 2020-9-16 19:48 编辑
问题导读:
1、如何设计用户行为启动表数据解析?
2、get_json_object 函数如何使用?
3、DWD 层用户行为事件表数据如何解析?
4、如何自定义 UDF 函数?



一、数仓搭建 - DWD 层
  • 1)对用户行为数据解析
  • 2)对核心数据进行判空过滤
  • 3)对业务数据采用维度模型重新建模,即维度退化

1.1 DWD 层(用户行为启动表数据解析)
2020-09-16_193001.jpg

1.1.1 创建启动表 1)建表语句
  1. drop table if exists dwd_start_log;
  2. CREATE EXTERNAL TABLE dwd_start_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `entry` string,
  21. `open_ad_type` string,
  22. `action` string,
  23. `loading_time` string,
  24. `detail` string,
  25. `extend1` string
  26. )
  27. PARTITIONED BY (dt string)
  28. stored as parquet
  29. location '/warehouse/gmall/dwd/dwd_start_log/'
  30. TBLPROPERTIES('parquet.compression'='lzo');
复制代码

说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引

1.1.2 get_json_object 函数使用

1)输入数据 xjson
  1. Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男","age":"47"}]
复制代码
2)取出第一个 json 对象
  1. SELECT get_json_object(xjson,"$.[0]") FROM person;
复制代码
结果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}
3)取出第一个 json 的 age 字段的值
  1. SELECT get_json_object(xjson,"$.[0].age") FROM person;
复制代码
结果是:25

1.1.3 向启动表导入数据
  1. insert overwrite table dwd_start_log
  2. PARTITION (dt='2020-03-10')
  3. select
  4. get_json_object(line,'$.mid') mid_id,
  5. get_json_object(line,'$.uid') user_id,
  6. get_json_object(line,'$.vc') version_code,
  7. get_json_object(line,'$.vn') version_name,
  8. get_json_object(line,'$.l') lang,
  9. get_json_object(line,'$.sr') source,
  10. get_json_object(line,'$.os') os,
  11. get_json_object(line,'$.ar') area,
  12. get_json_object(line,'$.md') model,
  13. get_json_object(line,'$.ba') brand,
  14. get_json_object(line,'$.sv') sdk_version,
  15. get_json_object(line,'$.g') gmail,
  16. get_json_object(line,'$.hw') height_width,
  17. get_json_object(line,'$.t') app_time,
  18. get_json_object(line,'$.nw') network,
  19. get_json_object(line,'$.ln') lng,
  20. get_json_object(line,'$.la') lat,
  21. get_json_object(line,'$.entry') entry,
  22. get_json_object(line,'$.open_ad_type') open_ad_type,
  23. get_json_object(line,'$.action') action,
  24. get_json_object(line,'$.loading_time') loading_time,
  25. get_json_object(line,'$.detail') detail,
  26. get_json_object(line,'$.extend1') extend1
  27. from ods_start_log
  28. where dt='2020-03-10';
复制代码

3)测试
  1. select * from dwd_start_log where dt='2020-03-10' limit 2;
复制代码

1.1.4 DWD 层启动表加载数据脚本

1)vim ods_to_dwd_log.sh
在脚本中编写如下内容

  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/modules/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n "$1" ] ;then
  7. do_date=$1
  8. else
  9. do_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. set hive.exec.dynamic.partition.mode=nonstrict;
  13. insert overwrite table "$APP".dwd_start_log
  14. PARTITION (dt='$do_date')
  15. select
  16. get_json_object(line,'$.mid') mid_id,
  17. get_json_object(line,'$.uid') user_id,
  18. get_json_object(line,'$.vc') version_code,
  19. get_json_object(line,'$.vn') version_name,
  20. get_json_object(line,'$.l') lang,
  21. get_json_object(line,'$.sr') source,
  22. get_json_object(line,'$.os') os,
  23. get_json_object(line,'$.ar') area,
  24. get_json_object(line,'$.md') model,
  25. get_json_object(line,'$.ba') brand,
  26. get_json_object(line,'$.sv') sdk_version,
  27. get_json_object(line,'$.g') gmail,
  28. get_json_object(line,'$.hw') height_width,
  29. get_json_object(line,'$.t') app_time,
  30. get_json_object(line,'$.nw') network,
  31. get_json_object(line,'$.ln') lng,
  32. get_json_object(line,'$.la') lat,
  33. get_json_object(line,'$.entry') entry,
  34. get_json_object(line,'$.open_ad_type') open_ad_type,
  35. get_json_object(line,'$.action') action,
  36. get_json_object(line,'$.loading_time') loading_time,
  37. get_json_object(line,'$.detail') detail,
  38. get_json_object(line,'$.extend1') extend1
  39. from "$APP".ods_start_log
  40. where dt='$do_date';
  41. "
  42. $hive -e "$sql"
复制代码

2)增加脚本执行权限
chmod 770 ods_to_dwd_log.sh 3)脚本使用
ods_to_dwd_log.sh 2020-03-11 4)查询导入结果
select * from dwd_start_log where dt='2020-03-11' limit 2;


1.2 DWD 层(用户行为事件表数据解析)
2020-09-16_193206.jpg

2020-09-16_193232.jpg

1.2.1 创建基础明细表

明细表用于存储 ODS 层原始表转换过来的明细数据
2020-08-10_114418.jpg
1)创建事件日志基础明细表
  1. drop table if exists dwd_base_event_log;
  2. CREATE EXTERNAL TABLE dwd_base_event_log(
  3.   `mid_id` string,
  4.   `user_id` string,
  5.   `version_code` string,
  6.   `version_name` string,
  7.   `lang` string,
  8.   `source` string,
  9.   `os` string,
  10.   `area` string,
  11.   `model` string,
  12.   `brand` string,
  13.   `sdk_version` string,
  14.   `gmail` string,
  15.   `height_width` string,
  16.   `app_time` string,
  17.   `network` string,
  18.   `lng` string,
  19.   `lat` string,
  20.   `event_name` string,
  21.   `event_json` string,
  22.   `server_time` string)
  23. PARTITIONED BY (`dt` string)
  24. stored as parquet
  25. location '/warehouse/gmall/dwd/dwd_base_event_log/'
  26. TBLPROPERTIES('parquet.compression'='lzo');
复制代码

2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF

1.2.2 自定义 UDF 函数(解析公共字段)

UDF 函数特点:一行进一行出。简称,一进一出
2020-09-16_193404.jpg

1)创建一个 maven 工程:hivefunction 2)创建包名:com.zsy.udf
3)在 pom.xml 文件中添加如下内容
  1. <properties>
  2.         <hive.version>2.3.0</hive.version>
  3. </properties>
  4. <repositories>
  5.         <repository>
  6.                 <id>spring-plugin</id>
  7.                 <url>https://repo.spring.io/plugins-release/</url>
  8.         </repository>
  9. </repositories>
  10. <dependencies>
  11.         <!--添加 hive 依赖-->
  12.         <dependency>
  13.                 <groupId>org.apache.hive</groupId>
  14.                 <artifactId>hive-exec</artifactId>
  15.                 <version>${hive.version}</version>
  16.         </dependency>
  17. </dependencies>
  18. <build>
  19.         <plugins>
  20.                 <plugin>
  21.                         <artifactId>maven-compiler-plugin</artifactId>
  22.                         <version>2.3.2</version>
  23.                         <configuration>
  24.                                 <source>1.8</source>
  25.                                 <target>1.8</target>
  26.                         </configuration>
  27.                 </plugin>
  28.                 <plugin>
  29.                         <artifactId>maven-assembly-plugin</artifactId>
  30.                         <configuration>
  31.                                 <descriptorRefs>
  32.                                         <descriptorRef>jar-with-dependencies</descriptorRef>
  33.                                 </descriptorRefs>
  34.                         </configuration>
  35.                         <executions>
  36.                                 <execution>
  37.                                         <id>make-assembly</id>
  38.                                         <phase>package</phase>
  39.                                         <goals>
  40.                                                 <goal>single</goal>
  41.                                         </goals>
  42.                                 </execution>
  43.                         </executions>
  44.                 </plugin>
  45.         </plugins>
  46. </build>
复制代码

注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中
  1. -Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
  2. -Dmaven.wagon.http.ssl.ignore.validity.dates=true
复制代码


2020050416053752.png
详情请点击博客&#128073;:maven下载依赖时候忽略SSL证书校验
注意 2:打包时如果出现如下错误,说明 idea 内存溢出
  1. Exception in thread "main" java.lang.StackOverflowError
复制代码

解决办法:把 -Xss4m 添加到下图位置
20200504160830172.png

4)UDF 用于解析公共字段
  1. package com.zsy.udf;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.hive.ql.exec.UDF;
  4. import org.json.JSONObject;
  5. public class BaseFieldUDF extends UDF {
  6.     public String evaluate(String line,String key){
  7.         // 1.切分数据
  8.         String[] log = line.split("\\|");
  9.         String result = "";
  10.         // 2.校验
  11.         if(log.length != 2 || StringUtils.isBlank(log[1])){
  12.             return result;
  13.         }
  14.         // 3.解析数据获取json对象
  15.         JSONObject json = new JSONObject(log[1].trim());
  16.         // 4.根据传入的key获取对应的值
  17.         if("st".equals(key)){
  18.             result = log[0].trim();
  19.         }else if("et".equals(key)){
  20.             if(json.has("et")){
  21.                 result = json.getString("et");
  22.             }
  23.         }else{
  24.             JSONObject cm = json.getJSONObject("cm");
  25.             if(cm.has(key)){
  26.                 result = cm.getString(key);
  27.             }
  28.         }
  29.         return result;
  30.     }
  31.     /**
  32.      * 测试
  33.      */
  34. //    public static void main(String[] args) {
  35. //        String line = "1583776132686|{"cm":{"ln":"-42.8","sv":"V2.3.9","os":"8.1.7","g":"X470IP70@gmail.com","mid":"0","nw":"4G","l":"en","vc":"13","hw":"1080*1920","ar":"MX","uid":"0","t":"1583758268106","la":"-31.3","md":"sumsung-18","vn":"1.1.1","ba":"Sumsung","sr":"M"},"ap":"app","et":[{"ett":"1583685512624","en":"display","kv":{"goodsid":"0","action":"2","extend1":"2","place":"1","category":"17"}},{"ett":"1583769686402","en":"newsdetail","kv":{"entry":"3","goodsid":"1","news_staytime":"16","loading_time":"0","action":"4","showtype":"5","category":"97","type1":""}},{"ett":"1583709065211","en":"ad","kv":{"activityId":"1","displayMills":"58537","entry":"1","action":"3","contentType":"0"}},{"ett":"1583693966746","en":"active_background","kv":{"active_source":"3"}},{"ett":"1583734521683","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1583755388633","en":"praise","kv":{"target_id":0,"id":1,"type":3,"add_time":"1583713812739","userid":4}}]}";
  36. //        String result = new BaseFieldUDF().evaluate(line, "st");
  37. //        System.out.println(result);
  38. //    }
  39. }
复制代码

1.2.3 自定义 UDTF 函数(解析事件字段)

UDTF 函数特点:多行进多行出。 简称,多进多出。
2020-09-16_193637.jpg

1)创建包名:com.zsy.udtf
2)在 com.zsy.udtf 包下创建类名:EventJsonUDTF
3)用于展开业务字段
  1. package com.zsy.udtf;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  4. import org.apache.hadoop.hive.ql.metadata.HiveException;
  5. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
  6. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  8. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  10. import org.json.JSONArray;
  11. import org.json.JSONException;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. public class EventJsonUDTF extends GenericUDTF {
  15.     @Override
  16.     public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
  17.         // 定义UDTF返回值类型和名称
  18.         List<String> fieldName = new ArrayList<>();
  19.         List<ObjectInspector> fieldType = new ArrayList<>();
  20.         fieldName.add("event_name");
  21.         fieldName.add("event_json");
  22.         fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  23.         fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  24.         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldType);
  25.     }
  26.     @Override
  27.     public void process(Object[] objects) throws HiveException {
  28.         // 1.获取传入的数据,传入的是Json array =》 UDF传入et
  29.         String input = objects[0].toString();
  30.         // 2.校验
  31.         if (StringUtils.isBlank(input)) {
  32.             return;
  33.         } else {
  34.             JSONArray ja = new JSONArray(input);
  35.             if (ja == null) {
  36.                 return;
  37.             }
  38.             // 循环遍历array当中的每一个元素,封装成 事件名称和事件内容
  39.             for (int i = 0; i < ja.length(); i++) {
  40.                 String[] result = new String[2];
  41.                 try {
  42.                     result[0] = ja.getJSONObject(i).getString("en");
  43.                     result[1] = ja.getString(i);
  44.                 } catch (JSONException ex) {
  45.                     continue;
  46.                 }
  47.                 // 写出数据
  48.                 forward(result);
  49.             }
  50.         }
  51.     }
  52.     @Override
  53.     public void close() throws HiveException {
  54.     }
  55. }
复制代码
4)打包,上传到HDFS的 /user/hive/jars
2020-09-16_193722.jpg
  1. hdfs dfs -mkdir /user/hive/jars
  2. hdfs dfs -put ./hivefunction-1.0-SNAPSHOT.jar /user/hive/jars
复制代码

注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
jar 包,然后重启 Hive 客户端即可

1.2.4 解析事件日志基础明细表

1)解析事件日志基础明细表
  1. insert overwrite table dwd_base_event_log partition(dt='2020-03-10')
  2. select
  3. base_analizer(line,'mid') as mid_id,
  4. base_analizer(line,'uid') as user_id,
  5. base_analizer(line,'vc') as version_code,
  6. base_analizer(line,'vn') as version_name,
  7. base_analizer(line,'l') as lang,
  8. base_analizer(line,'sr') as source,
  9. base_analizer(line,'os') as os,
  10. base_analizer(line,'ar') as area,
  11. base_analizer(line,'md') as model,
  12. base_analizer(line,'ba') as brand,
  13. base_analizer(line,'sv') as sdk_version,
  14. base_analizer(line,'g') as gmail,
  15. base_analizer(line,'hw') as height_width,
  16. base_analizer(line,'t') as app_time,
  17. base_analizer(line,'nw') as network,
  18. base_analizer(line,'ln') as lng,
  19. base_analizer(line,'la') as lat,
  20. event_name,
  21. event_json,
  22. base_analizer(line,'st') as server_time
  23. from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as
  24. event_name,event_json
  25. where dt='2020-03-10' and base_analizer(line,'et')<>'';
复制代码

2)测试
select * from dwd_base_event_log where dt='2020-03-10' limit 2;


1.2.5 DWD 层数据解析脚本

1)vim ods_to_dwd_base_log.sh
在脚本中编写如下内容
  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/modules/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n "$1" ] ;then
  7. do_date=$1
  8. else
  9. do_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. use gmall;
  13. insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date')
  14. select
  15. base_analizer(line,'mid') as mid_id,
  16. base_analizer(line,'uid') as user_id,
  17. base_analizer(line,'vc') as version_code,
  18. base_analizer(line,'vn') as version_name,
  19. base_analizer(line,'l') as lang,
  20. base_analizer(line,'sr') as source,
  21. base_analizer(line,'os') as os,
  22. base_analizer(line,'ar') as area,
  23. base_analizer(line,'md') as model,
  24. base_analizer(line,'ba') as brand,
  25. base_analizer(line,'sv') as sdk_version,
  26. base_analizer(line,'g') as gmail,
  27. base_analizer(line,'hw') as height_width,
  28. base_analizer(line,'t') as app_time,
  29. base_analizer(line,'nw') as network,
  30. base_analizer(line,'ln') as lng,
  31. base_analizer(line,'la') as lat,
  32. event_name,
  33. event_json,
  34. base_analizer(line,'st') as server_time
  35. from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as
  36. event_name,event_json
  37. where dt='$do_date' and base_analizer(line,'et')<>''; "
  38. $hive -e "$sql"
复制代码

注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
2)增加脚本执行权限
  1. chmod 770 ods_to_dwd_base_log.sh
复制代码
3)脚本使用
  1. ods_to_dwd_base_log.sh 2020-03-11
复制代码

4)查询导入结果
  1. select * from dwd_base_event_log where dt='2020-03-11' limit 2;
复制代码

1.3 DWD 层(用户行为事件表获取)
2020-09-16_193905.jpg

1.3.1 商品点击表
2020-09-16_194337.jpg

1)建表语句
  1. drop table if exists dwd_display_log;
  2. CREATE EXTERNAL TABLE dwd_display_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `action` string,
  21. `goodsid` string,
  22. `place` string,
  23. `extend1` string,
  24. `category` string,
  25. `server_time` string
  26. )
  27. PARTITIONED BY (dt string)
  28. stored as parquet
  29. location '/warehouse/gmall/dwd/dwd_display_log/'
  30. TBLPROPERTIES('parquet.compression'='lzo');
复制代码

2)导入数据
  1. insert overwrite table dwd_display_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.action') action,
  21. get_json_object(event_json,'$.kv.goodsid') goodsid,
  22. get_json_object(event_json,'$.kv.place') place,
  23. get_json_object(event_json,'$.kv.extend1') extend1,
  24. get_json_object(event_json,'$.kv.category') category,
  25. server_time
  26. from dwd_base_event_log
  27. where dt='2020-03-10' and event_name='display';
复制代码

3)测试
  1. select * from dwd_display_log where dt='2020-03-10' limit 2;
复制代码
1.3.2 商品详情页表

1)建表语句
  1. drop table if exists dwd_newsdetail_log;
  2. CREATE EXTERNAL TABLE dwd_newsdetail_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `entry` string,
  21. `action` string,
  22. `goodsid` string,
  23. `showtype` string,
  24. `news_staytime` string,
  25. `loading_time` string,
  26. `type1` string,
  27. `category` string,
  28. `server_time` string)
  29. PARTITIONED BY (dt string)
  30. stored as parquet
  31. location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
  32. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_newsdetail_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.entry') entry,
  21. get_json_object(event_json,'$.kv.action') action,
  22. get_json_object(event_json,'$.kv.goodsid') goodsid,
  23. get_json_object(event_json,'$.kv.showtype') showtype,
  24. get_json_object(event_json,'$.kv.news_staytime') news_staytime,
  25. get_json_object(event_json,'$.kv.loading_time') loading_time,
  26. get_json_object(event_json,'$.kv.type1') type1,
  27. get_json_object(event_json,'$.kv.category') category,
  28. server_time
  29. from dwd_base_event_log
  30. where dt='2020-03-10' and event_name='newsdetail';
复制代码

3)测试
  1. select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;
复制代码

1.3.3 商品列表页表
1)建表语句

  1. drop table if exists dwd_loading_log;
  2. CREATE EXTERNAL TABLE dwd_loading_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `action` string,
  21. `loading_time` string,
  22. `loading_way` string,
  23. `extend1` string,
  24. `extend2` string,
  25. `type` string,
  26. `type1` string,
  27. `server_time` string)
  28. PARTITIONED BY (dt string)
  29. stored as parquet
  30. location '/warehouse/gmall/dwd/dwd_loading_log/'
  31. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_loading_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.action') action,
  21. get_json_object(event_json,'$.kv.loading_time') loading_time,
  22. get_json_object(event_json,'$.kv.loading_way') loading_way,
  23. get_json_object(event_json,'$.kv.extend1') extend1,
  24. get_json_object(event_json,'$.kv.extend2') extend2,
  25. get_json_object(event_json,'$.kv.type') type,
  26. get_json_object(event_json,'$.kv.type1') type1,
  27. server_time
  28. from dwd_base_event_log
  29. where dt='2020-03-10' and event_name='loading';
复制代码

3)测试
  1. hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;
复制代码
1.3.4 广告表

1)建表语句
  1. drop table if exists dwd_ad_log;
  2. CREATE EXTERNAL TABLE dwd_ad_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `entry` string,
  21. `action` string,
  22. `contentType` string,
  23. `displayMills` string,
  24. `itemId` string,
  25. `activityId` string,
  26. `server_time` string)
  27. PARTITIONED BY (dt string)
  28. stored as parquet
  29. location '/warehouse/gmall/dwd/dwd_ad_log/'
  30. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_ad_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.entry') entry,
  21. get_json_object(event_json,'$.kv.action') action,
  22. get_json_object(event_json,'$.kv.contentType') contentType,
  23. get_json_object(event_json,'$.kv.displayMills') displayMills,
  24. get_json_object(event_json,'$.kv.itemId') itemId,
  25. get_json_object(event_json,'$.kv.activityId') activityId,
  26. server_time
  27. from dwd_base_event_log
  28. where dt='2020-03-10' and event_name='ad';
复制代码

3)测试
  1. select * from dwd_ad_log where dt='2020-03-10' limit 2;
复制代码

1.3.5 消息通知表

1)建表语句
  1. drop table if exists dwd_notification_log;
  2. CREATE EXTERNAL TABLE dwd_notification_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `action` string,
  21. `noti_type` string,
  22. `ap_time` string,
  23. `content` string,
  24. `server_time` string
  25. )
  26. PARTITIONED BY (dt string)
  27. stored as parquet
  28. location '/warehouse/gmall/dwd/dwd_notification_log/'
  29. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_notification_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.action') action,
  21. get_json_object(event_json,'$.kv.noti_type') noti_type,
  22. get_json_object(event_json,'$.kv.ap_time') ap_time,
  23. get_json_object(event_json,'$.kv.content') content,
  24. server_time
  25. from dwd_base_event_log
  26. where dt='2020-03-10' and event_name='notification';
复制代码

3)测试
  1. select * from dwd_notification_log where dt='2020-03-10' limit 2;
复制代码

1.3.6 用户后台活跃表
1)建表语句
  1. drop table if exists dwd_active_background_log;
  2. CREATE EXTERNAL TABLE dwd_active_background_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `active_source` string,
  21. `server_time` string
  22. )
  23. PARTITIONED BY (dt string)
  24. stored as parquet
  25. location '/warehouse/gmall/dwd/dwd_background_log/'
  26. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_active_background_log PARTITION
  2. (dt='2020-03-10')
  3. select
  4. mid_id,
  5. user_id,
  6. version_code,
  7. version_name,
  8. lang,
  9. source,
  10. os,
  11. area,
  12. model,
  13. brand,
  14. sdk_version,
  15. gmail,
  16. height_width,
  17. app_time,
  18. network,
  19. lng,
  20. lat,
  21. get_json_object(event_json,'$.kv.active_source') active_source,
  22. server_time
  23. from dwd_base_event_log
  24. where dt='2020-03-10' and event_name='active_background';
复制代码

3)测试
  1. select * from dwd_active_background_log where dt='2020-03-10' limit 2;
复制代码

1.3.7 评论表

1)建表语句
  1. drop table if exists dwd_comment_log;
  2. CREATE EXTERNAL TABLE dwd_comment_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `comment_id` int,
  21. `userid` int,
  22. `p_comment_id` int,
  23. `content` string,
  24. `addtime` string,
  25. `other_id` int,
  26. `praise_count` int,
  27. `reply_count` int,
  28. `server_time` string
  29. )
  30. PARTITIONED BY (dt string)
  31. stored as parquet
  32. location '/warehouse/gmall/dwd/dwd_comment_log/'
  33. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_comment_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.comment_id') comment_id,
  21. get_json_object(event_json,'$.kv.userid') userid,
  22. get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
  23. get_json_object(event_json,'$.kv.content') content,
  24. get_json_object(event_json,'$.kv.addtime') addtime,
  25. get_json_object(event_json,'$.kv.other_id') other_id,
  26. get_json_object(event_json,'$.kv.praise_count') praise_count,
  27. get_json_object(event_json,'$.kv.reply_count') reply_count,
  28. server_time
  29. from dwd_base_event_log
  30. where dt='2020-03-10' and event_name='comment';
复制代码

3)测试
  1. select * from dwd_comment_log where dt='2020-03-10' limit 2;
复制代码

1.3.8 收藏表

1)建表语句
  1. drop table if exists dwd_favorites_log;
  2. CREATE EXTERNAL TABLE dwd_favorites_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `id` int,
  21. `course_id` int,
  22. `userid` int,
  23. `add_time` string,
  24. `server_time` string
  25. )
  26. PARTITIONED BY (dt string)
  27. stored as parquet
  28. location '/warehouse/gmall/dwd/dwd_favorites_log/'
  29. TBLPROPERTIES('parquet.compression'='lzo');
复制代码

2)导入数据
  1. insert overwrite table dwd_favorites_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.id') id,
  21. get_json_object(event_json,'$.kv.course_id') course_id,
  22. get_json_object(event_json,'$.kv.userid') userid,
  23. get_json_object(event_json,'$.kv.add_time') add_time,
  24. server_time
  25. from dwd_base_event_log
  26. where dt='2020-03-10' and event_name='favorites';
复制代码

3)测试
  1. select * from dwd_favorites_log where dt='2020-03-10' limit 2;
复制代码

1.3.9 点赞表

1)建表语句
  1. drop table if exists dwd_praise_log;
  2. CREATE EXTERNAL TABLE dwd_praise_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `id` string,
  21. `userid` string,
  22. `target_id` string,
  23. `type` string,
  24. `add_time` string,
  25. `server_time` string
  26. )
  27. PARTITIONED BY (dt string)
  28. stored as parquet
  29. location '/warehouse/gmall/dwd/dwd_praise_log/'
  30. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_praise_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.id') id,
  21. get_json_object(event_json,'$.kv.userid') userid,
  22. get_json_object(event_json,'$.kv.target_id') target_id,
  23. get_json_object(event_json,'$.kv.type') type,
  24. get_json_object(event_json,'$.kv.add_time') add_time,
  25. server_time
  26. from dwd_base_event_log
  27. where dt='2020-03-10' and event_name='praise';
复制代码

3)测试
  1. select * from dwd_praise_log where dt='2020-03-10' limit 2;
复制代码

1.3.10 错误日志表
1)建表语句
  1. drop table if exists dwd_error_log;
  2. CREATE EXTERNAL TABLE dwd_error_log(
  3. `mid_id` string,
  4. `user_id` string,
  5. `version_code` string,
  6. `version_name` string,
  7. `lang` string,
  8. `source` string,
  9. `os` string,
  10. `area` string,
  11. `model` string,
  12. `brand` string,
  13. `sdk_version` string,
  14. `gmail` string,
  15. `height_width` string,
  16. `app_time` string,
  17. `network` string,
  18. `lng` string,
  19. `lat` string,
  20. `errorBrief` string,
  21. `errorDetail` string,
  22. `server_time` string)
  23. PARTITIONED BY (dt string)
  24. stored as parquet
  25. location '/warehouse/gmall/dwd/dwd_error_log/'
  26. TBLPROPERTIES('parquet.compression'='lzo');
复制代码
2)导入数据
  1. insert overwrite table dwd_error_log PARTITION (dt='2020-03-10')
  2. select
  3. mid_id,
  4. user_id,
  5. version_code,
  6. version_name,
  7. lang,
  8. source,
  9. os,
  10. area,
  11. model,
  12. brand,
  13. sdk_version,
  14. gmail,
  15. height_width,
  16. app_time,
  17. network,
  18. lng,
  19. lat,
  20. get_json_object(event_json,'$.kv.errorBrief') errorBrief,
  21. get_json_object(event_json,'$.kv.errorDetail') errorDetail,
  22. server_time
  23. from dwd_base_event_log
  24. where dt='2020-03-10' and event_name='error';
复制代码

3)测试
  1. select * from dwd_error_log where dt='2020-03-10' limit 2;
复制代码

1.3.11 DWD 层事件表加载数据脚本

1) vim ods_to_dwd_event_log.sh
在脚本中编写如下内容


  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/modules/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n "$1" ] ;then
  7. do_date=$1
  8. else
  9. do_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. insert overwrite table "$APP".dwd_display_log
  13. PARTITION (dt='$do_date')
  14. select
  15. mid_id,
  16. user_id,
  17. version_code,
  18. version_name,
  19. lang,
  20. source,
  21. os,
  22. area,
  23. model,
  24. brand,
  25. sdk_version,
  26. gmail,
  27. height_width,
  28. app_time,
  29. network,
  30. lng,
  31. lat,
  32. get_json_object(event_json,'$.kv.action') action,
  33. get_json_object(event_json,'$.kv.goodsid') goodsid,
  34. get_json_object(event_json,'$.kv.place') place,
  35. get_json_object(event_json,'$.kv.extend1') extend1,
  36. get_json_object(event_json,'$.kv.category') category,
  37. server_time
  38. from "$APP".dwd_base_event_log
  39. where dt='$do_date' and event_name='display';
  40. insert overwrite table "$APP".dwd_newsdetail_log
  41. PARTITION (dt='$do_date')
  42. select
  43. mid_id,
  44. user_id,
  45. version_code,
  46. version_name,
  47. lang,
  48. source,
  49. os,
  50. area,
  51. model,
  52. brand,
  53. sdk_version,
  54. gmail,
  55. height_width,
  56. app_time,
  57. network,
  58. lng,
  59. lat,
  60. get_json_object(event_json,'$.kv.entry') entry,
  61. get_json_object(event_json,'$.kv.action') action,
  62. get_json_object(event_json,'$.kv.goodsid') goodsid,
  63. get_json_object(event_json,'$.kv.showtype') showtype,
  64. get_json_object(event_json,'$.kv.news_staytime')
  65. news_staytime,
  66. get_json_object(event_json,'$.kv.loading_time')
  67. loading_time,
  68. get_json_object(event_json,'$.kv.type1') type1,
  69. get_json_object(event_json,'$.kv.category') category,
  70. server_time
  71. from "$APP".dwd_base_event_log
  72. where dt='$do_date' and event_name='newsdetail';
  73. insert overwrite table "$APP".dwd_loading_log
  74. PARTITION (dt='$do_date')
  75. select
  76. mid_id,
  77. user_id,
  78. version_code,
  79. version_name,
  80. lang,
  81. source,
  82. os,
  83. area,
  84. model,
  85. brand,
  86. sdk_version,
  87. gmail,
  88. height_width,
  89. app_time,
  90. network,
  91. lng,
  92. lat,
  93. get_json_object(event_json,'$.kv.action') action,
  94. get_json_object(event_json,'$.kv.loading_time')
  95. loading_time,
  96. get_json_object(event_json,'$.kv.loading_way') loading_way,
  97. get_json_object(event_json,'$.kv.extend1') extend1,
  98. get_json_object(event_json,'$.kv.extend2') extend2,
  99. get_json_object(event_json,'$.kv.type') type,
  100. get_json_object(event_json,'$.kv.type1') type1,
  101. server_time
  102. from "$APP".dwd_base_event_log
  103. where dt='$do_date' and event_name='loading';
  104. insert overwrite table "$APP".dwd_ad_log
  105. PARTITION (dt='$do_date')
  106. select
  107. mid_id,
  108. user_id,
  109. version_code,
  110. version_name,
  111. lang,
  112. source,
  113. os,
  114. area,
  115. model,
  116. brand,
  117. sdk_version,
  118. gmail,
  119. height_width,
  120. app_time,
  121. network,
  122. lng,
  123. lat,
  124. get_json_object(event_json,'$.kv.entry') entry,
  125. get_json_object(event_json,'$.kv.action') action,
  126. get_json_object(event_json,'$.kv.contentType') contentType,
  127. get_json_object(event_json,'$.kv.displayMills')
  128. displayMills,
  129. get_json_object(event_json,'$.kv.itemId') itemId,
  130. get_json_object(event_json,'$.kv.activityId') activityId,
  131. server_time
  132. from "$APP".dwd_base_event_log
  133. where dt='$do_date' and event_name='ad';
  134. insert overwrite table "$APP".dwd_notification_log
  135. PARTITION (dt='$do_date')
  136. select
  137. mid_id,
  138. user_id,
  139. version_code,
  140. version_name,
  141. lang,
  142. source,
  143. os,
  144. area,
  145. model,
  146. brand,
  147. sdk_version,
  148. gmail,
  149. height_width,
  150. app_time,
  151. network,
  152. lng,
  153. lat,
  154. get_json_object(event_json,'$.kv.action') action,
  155. get_json_object(event_json,'$.kv.noti_type') noti_type,
  156. get_json_object(event_json,'$.kv.ap_time') ap_time,
  157. get_json_object(event_json,'$.kv.content') content,
  158. server_time
  159. from "$APP".dwd_base_event_log
  160. where dt='$do_date' and event_name='notification';
  161. insert overwrite table "$APP".dwd_active_background_log
  162. PARTITION (dt='$do_date')
  163. select
  164. mid_id,
  165. user_id,
  166. version_code,
  167. version_name,
  168. lang,
  169. source,
  170. os,
  171. area,
  172. model,
  173. brand,
  174. sdk_version,
  175. gmail,
  176. height_width,
  177. app_time,
  178. network,
  179. lng,
  180. lat,
  181. get_json_object(event_json,'$.kv.active_source')
  182. active_source,
  183. server_time
  184. from "$APP".dwd_base_event_log
  185. where dt='$do_date' and event_name='active_background';
  186. insert overwrite table "$APP".dwd_comment_log
  187. PARTITION (dt='$do_date')
  188. select
  189. mid_id,
  190. user_id,
  191. version_code,
  192. version_name,
  193. lang,
  194. source,
  195. os,
  196. area,
  197. model,
  198. brand,
  199. sdk_version,
  200. gmail,
  201. height_width,
  202. app_time,
  203. network,
  204. lng,
  205. lat,
  206. get_json_object(event_json,'$.kv.comment_id') comment_id,
  207. get_json_object(event_json,'$.kv.userid') userid,
  208. get_json_object(event_json,'$.kv.p_comment_id')
  209. p_comment_id,
  210. get_json_object(event_json,'$.kv.content') content,
  211. get_json_object(event_json,'$.kv.addtime') addtime,
  212. get_json_object(event_json,'$.kv.other_id') other_id,
  213. get_json_object(event_json,'$.kv.praise_count')
  214. praise_count,
  215. get_json_object(event_json,'$.kv.reply_count') reply_count,
  216. server_time
  217. from "$APP".dwd_base_event_log
  218. where dt='$do_date' and event_name='comment';
  219. insert overwrite table "$APP".dwd_favorites_log
  220. PARTITION (dt='$do_date')
  221. select
  222. mid_id,
  223. user_id,
  224. version_code,
  225. version_name,
  226. lang,
  227. source,
  228. os,
  229. area,
  230. model,
  231. brand,
  232. sdk_version,
  233. gmail,
  234. height_width,
  235. app_time,
  236. network,
  237. lng,
  238. lat,
  239. get_json_object(event_json,'$.kv.id') id,
  240. get_json_object(event_json,'$.kv.course_id') course_id,
  241. get_json_object(event_json,'$.kv.userid') userid,
  242. get_json_object(event_json,'$.kv.add_time') add_time,
  243. server_time
  244. from "$APP".dwd_base_event_log
  245. where dt='$do_date' and event_name='favorites';
  246. insert overwrite table "$APP".dwd_praise_log
  247. PARTITION (dt='$do_date')
  248. select
  249. mid_id,
  250. user_id,
  251. version_code,
  252. version_name,
  253. lang,
  254. source,
  255. os,
  256. area,
  257. model,
  258. brand,
  259. sdk_version,
  260. gmail,
  261. height_width,
  262. app_time,
  263. network,
  264. lng,
  265. lat,
  266. get_json_object(event_json,'$.kv.id') id,
  267. get_json_object(event_json,'$.kv.userid') userid,
  268. get_json_object(event_json,'$.kv.target_id') target_id,
  269. get_json_object(event_json,'$.kv.type') type,
  270. get_json_object(event_json,'$.kv.add_time') add_time,
  271. server_time
  272. from "$APP".dwd_base_event_log
  273. where dt='$do_date' and event_name='praise';
  274. insert overwrite table "$APP".dwd_error_log
  275. PARTITION (dt='$do_date')
  276. select
  277. mid_id,
  278. user_id,
  279. version_code,
  280. version_name,
  281. lang,
  282. source,
  283. os,
  284. area,
  285. model,
  286. brand,
  287. sdk_version,
  288. gmail,
  289. height_width,
  290. app_time,
  291. network,
  292. lng,
  293. lat,
  294. get_json_object(event_json,'$.kv.errorBrief') errorBrief,
  295. get_json_object(event_json,'$.kv.errorDetail') errorDetail,
  296. server_time
  297. from "$APP".dwd_base_event_log
  298. where dt='$do_date' and event_name='error';
  299. "
  300. $hive -e "$sql"
复制代码

2)增加脚本执行权限
  1. chmod 770 ods_to_dwd_event_log.sh
复制代码
3)脚本使用
  1. ods_to_dwd_event_log.sh 2020-03-11
复制代码

4)查询导入结果
  1. select * from dwd_comment_log where dt='2020-03-11' limit 2;
复制代码

结束语
本章对ODS层的用户行为数据进行了解析,构建并将数据导入到了DWD层,下章将会对ODS层的业务数据解析,导入DWD层!

作者:qq_43733123
来源:https://blog.csdn.net/qq_43733123/article/details/105904530

最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条