分享

企业级数据仓库构建(三):数据采集模块环境搭建

问题导读:

1. 数仓数据采集模块集群规划是怎么样的?
2. 数仓日志生成模块的配置是怎么样的?
3. 数仓数据采集模块Kafka如何进行优化?


上一篇企业级数据仓库构建(二)
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29509


一、数据采集模块

【1】Linux环境搭建

Linux配置请看这篇博客Linux基本配置

20200428153222246.png

【2】Hadoop环境搭建

20200428221758927.png

1)基础环境创建
  1. [node01]
  2. cd ~
  3. mkdir bin
  4. cd bin
  5. vim xsync
  6. =======================脚本如下========================
  7. #!/bin/bash
  8. #1 获取输入参数个数,如果没有参数,直接退出
  9. pcount=$#
  10. if((pcount==0)); then
  11. echo no args;
  12. exit;
  13. fi
  14. #2 获取文件名称
  15. p1=$1
  16. fname=`basename $p1`
  17. echo fname=$fname
  18. #3 获取上级目录到绝对路径
  19. pdir=`cd -P $(dirname $p1); pwd`
  20. echo pdir=$pdir
  21. #4 获取当前用户名称
  22. user=`whoami`
  23. #5 循环
  24. for((host=101; host<104; host++)); do
  25.         echo ------------------- hadoop$host --------------
  26.         rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
  27. done
  28. =======================脚本如上========================
  29. chmod 770 xsync
  30. sudo rm -rf /opt/*
  31. sudo mkdir /opt/modules
  32. sudo mkdir /opt/software
  33. sudo mkdir -p /opt/tmp/logs
  34. sudo chown zsy:zsy -R /opt
  35. xsync /opt/*
  36. [node02/node03]
  37. sudo chown zsy:zsy -R /opt
复制代码

2)JDK安装


注意: 安装之前,请先删除系统自带的JDK,查看博客【Linux】Linux卸载自带的OpenJDK
  1. [node01]
  2. tar -zxf /opt/software/jdk-8u144-linux-x64.tar.gz -C /opt/modules
  3. sudo vim /etc/profile.d/java.sh
  4. export JAVA_HOME=/opt/modules/jdk1.8.0_144
  5. export PATH=$PATH:$JAVA_HOME/bin
  6. source /etc/profile
  7. xsync /opt/modules/jdk1.8.0_144
  8. sudo scp /etc/profile.d/java.sh node02:/etc/profile
  9. sudo scp /etc/profile.d/java.sh node03:/etc/profile
  10. [node02/node03]
  11. source /etc/profile
复制代码

注意可以看到,我在上面添加的JDK环境变量是在/etc/profile.d目录下创建了一个以.sh结尾的文件,那么为什么可以这么做呢?

我们首先说一下环境变量的配置方式有哪些

1)修改/etc/profile文件:用来设置系统环境参数,比如$PATH,这里面的环境变量是对系统内所有用户生效。使用bash命令,需要source /etc/profile一下

2)修改~/.bashrc文件:针对某一个特定的用户,环境变量的设置只对该用户自己有效,使用bash命令,只要以该用户身份运行命令行就会读取该文件,该文件会去加载/etc/bashrc文件,该文件会遍历/etc/profile.d文件下的以.sh文件结尾的文件,将其中的环境变量添加到/etc/bashrc文件中,所以我们把配置的环境变量添加到/etc/profile.d目录下

3)说明:
登录式Shell:采用用户名登录,会自动加载/etc/profile
非登录式Sehll:采用ssh登录,不会自动加载/etc/profile,会自动加载~/.backrc

3)Zookeeper安装
具体安装方式请点击博客【Zookeeper】Zookeeper入门解析

4)Hadoop安装
具体安装方式请点击博客【Hadoop】HadoopHA高可用完全分布式搭建

5)Flume安装
具体安装方式请点击博客 【Flume】Flume入门解析(一)

说明:
【1】Source

1)Taildir Source 和 Exec Source 如何选择?

Taildir Source 相比Exec Sgurce、Spooling Directory Source的优势TailDir Source:断点续传、多目录。Flumel.6以前需要自己自定义Source 记录每次读取文件位置,实现断点续传

Exec Source 可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失
Spooling Directory Source监控目录,不支持断点续传

2)batchSize 大小如何设置?

Event1K左右时,500-1000合适(默认为100)

【2】Channel

采用Kafka Channel,省去了Sink,提高了效率
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent 配置为true还是false,都会转为FlumeEvent。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可

【3】架构图

20200429095050516.png

【4】编辑Flume采集日志数据发送到Kafka配置文件(记得同步配置)
  1. # 说明1:我们使用 TAILDIR Source 监控多目录,自动实现断点续传,版本需要在1.7+
  2. # 说明2:我们使用 Kafka Channel,不使用Kafka Sink,提高效率
  3. a1.sources=r1
  4. a1.channels=c1 c2
  5. # configure source
  6. a1.sources.r1.type = TAILDIR
  7. # 断点续传持久化目录
  8. a1.sources.r1.positionFile = /opt/modules/flume/log_position/log_position.json
  9. # 设置需要监控的多个目录,我们只需要一个,所以只添加一个 f1
  10. a1.sources.r1.filegroups = f1
  11. # 设置 f1 对应的监控目录
  12. a1.sources.r1.filegroups.f1 = /opt/tmp/logs/app.+
  13. a1.sources.r1.fileHeader = true
  14. a1.sources.r1.channels = c1 c2
  15. # interceptor 添加拦截器
  16. a1.sources.r1.interceptors = i1 i2
  17. # 自定义拦截器
  18. a1.sources.r1.interceptors.i1.type = com.zsy.flume.interceptor.LogETLInterceptor$Builder   # ETL拦截器
  19. a1.sources.r1.interceptors.i2.type = com.zsy.flume.interceptor.LogTypeInterceptor$Builder  #日志类型拦截器
  20. # 根据header头信息,将source数据发送到不同的 Channel
  21. a1.sources.r1.selector.type = multiplexing
  22. a1.sources.r1.selector.header = topic
  23. a1.sources.r1.selector.mapping.topic_start = c1
  24. a1.sources.r1.selector.mapping.topic_event = c2
  25. # configure channel
  26. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  27. a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  28. a1.channels.c1.kafka.topic = topic_start   # 日志类型是start,数据发往 channel1
  29. a1.channels.c1.parseAsFlumeEvent = false
  30. a1.channels.c1.kafka.consumer.group.id = flume-consumer
  31. a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
  32. a1.channels.c2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  33. a1.channels.c2.kafka.topic = topic_event  # 日志类型是event,数据发往 channel2
  34. a1.channels.c2.parseAsFlumeEvent = false
  35. a1.channels.c2.kafka.consumer.group.id = flume-consumer
复制代码

【5】自定义拦截器


创建Maven项目,添加如下依赖:
  1. <dependencies>
  2.         <dependency>
  3.             <groupId>org.apache.flume</groupId>
  4.             <artifactId>flume-ng-core</artifactId>
  5.             <version>1.7.0</version>
  6.         </dependency>
  7.     </dependencies>
  8.     <build>
  9.         <plugins>
  10.             <plugin>
  11.                 <artifactId>maven-compiler-plugin</artifactId>
  12.                 <version>2.3.2</version>
  13.                 <configuration>
  14.                     <source>1.8</source>
  15.                     <target>1.8</target>
  16.                 </configuration>
  17.             </plugin>
  18.             <plugin>
  19.                 <artifactId>maven-assembly-plugin</artifactId>
  20.                 <configuration>
  21.                     <descriptorRefs>
  22.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  23.                     </descriptorRefs>
  24.                 </configuration>
  25.                 <executions>
  26.                     <execution>
  27.                         <id>make-assembly</id>
  28.                         <phase>package</phase>
  29.                         <goals>
  30.                             <goal>single</goal>
  31.                         </goals>
  32.                     </execution>
  33.                 </executions>
  34.             </plugin>
  35.         </plugins>
  36.     </build>
复制代码

自定义拦截器步骤:
① 定义一个类,实现Flume的Interceptor接口
② 重写4个方法

  • 初始化
  • 单Event处理
  • 多Event处理
  • 关闭资源

③ 创建静态内部类,返回当前类对象
④ 打包上传

代码如下

1)com.zsy.flume.interceptor.LogETLInterceptor
  1. package com.zsy.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.nio.charset.Charset;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class LogETLInterceptor implements Interceptor {
  9.     @Override
  10.     public void initialize() {
  11.     }
  12.     @Override
  13.     public Event intercept(Event event) {
  14.         // 清洗数据 ETL
  15.         // 1.获取日志
  16.         byte[] body = event.getBody();
  17.         String log = new String(body, Charset.forName("UTF-8"));
  18.         // 2.区分类型处理
  19.         if (log.contains("start")) {
  20.             // 验证启动日志逻辑
  21.             if (LogUtils.validateStart(log)) {
  22.                 return event;
  23.             }
  24.         } else {
  25.             // 验证事件日志逻辑
  26.             if (LogUtils.validateEvent(log)) {
  27.                 return event;
  28.             }
  29.         }
  30.         return null;
  31.     }
  32.     @Override
  33.     public List<Event> intercept(List<Event> events) {
  34.         ArrayList<Event> interceptors = new ArrayList<>();
  35.         // 多event处理
  36.         for (Event event : events) {
  37.             Event intercept = intercept(event);
  38.             if (intercept != null) {
  39.                 interceptors.add(intercept);
  40.             }
  41.         }
  42.         return interceptors;
  43.     }
  44.     @Override
  45.     public void close() {
  46.     }
  47.     public static class Builder implements Interceptor.Builder {
  48.         @Override
  49.         public Interceptor build() {
  50.             return new LogETLInterceptor();
  51.         }
  52.         @Override
  53.         public void configure(Context context) {
  54.         }
  55.     }
  56. }
复制代码
2)com.zsy.flume.interceptor.LogTypeInterceptor
  1. package com.zsy.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.nio.charset.Charset;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.Map;
  9. public class LogTypeInterceptor implements Interceptor {
  10.     @Override
  11.     public void initialize() {
  12.     }
  13.     @Override
  14.     public Event intercept(Event event) {
  15.         // 区分类型 start event
  16.         // header body
  17.         byte[] body = event.getBody();
  18.         String log = new String(body, Charset.forName("UTF-8"));
  19.         // 获取头信息
  20.         Map<String, String> headers = event.getHeaders();
  21.         // 业务逻辑判断
  22.         if(log.contains("start")){
  23.             headers.put("topic","topic_start");
  24.         }else{
  25.             headers.put("topic","topic_event");
  26.         }
  27.         return event;
  28.     }
  29.     @Override
  30.     public List<Event> intercept(List<Event> events) {
  31.         ArrayList<Event> interceptors = new ArrayList<>();
  32.         for (Event event : events) {
  33.             Event intercept = intercept(event);
  34.             interceptors.add(intercept);
  35.         }
  36.         return interceptors;
  37.     }
  38.     @Override
  39.     public void close() {
  40.     }
  41.     public static class Builder implements Interceptor.Builder{
  42.         @Override
  43.         public Interceptor build() {
  44.             return new LogTypeInterceptor();
  45.         }
  46.         @Override
  47.         public void configure(Context context) {
  48.         }
  49.     }
  50. }
复制代码
3)工具类
  1. package com.zsy.flume.interceptor;
  2. import org.apache.commons.lang.math.NumberUtils;
  3. public class LogUtils {
  4.     // 验证启动日志逻辑
  5.     public static boolean validateStart(String log) {
  6.         if (log == null) {
  7.             return false;
  8.         }
  9.         // 判断数据是否是 { 开头 ,是否是 } 结尾
  10.         if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
  11.             return false;
  12.         }
  13.         return true;
  14.     }
  15.     // 验证事件日志逻辑
  16.     public static boolean validateEvent(String log) {
  17.         // 判断数据是否是 { 开头 ,是否是 } 结尾
  18.         // 服务器事件 | 日志内容
  19.         if (log == null) {
  20.             return false;
  21.         }
  22.         // 切割
  23.         String[] logContents = log.split("\\|");
  24.         if(logContents.length != 2){
  25.             return false;
  26.         }
  27.         // 校验服务器时间(长度必须是13位,必须全部是数字)
  28.         if(logContents[0].length() != 13 || !logContents[0].matches("[0-9]{13}")){
  29. //        if(logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])){
  30.             return false;
  31.         }
  32.         // 校验日志格式
  33.         if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {
  34.             return false;
  35.         }
  36.         return true;
  37.     }
  38. }
复制代码
4)打包将不带依赖的jar包上传到 Flume的 lib 目录下即可,flume启动会自动加载 lib下的所有jar包

【6】Flume启动/停止脚本
  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in node01 node02
  5. do
  6. echo " --------启动 $i 采集 flume-------"
  7. ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf-file /opt/modules/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
  8. done
  9. };;
  10. "stop"){
  11. for i in node01 node02
  12. do
  13. echo " --------停止 $i 采集 flume-------"
  14. ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
  15. done
  16. };;
  17. esac
  18. 说明1:nohub该命令表示在退出账户/关闭终端后继续运行相应的进程,意味不挂起,不挂断地运行命令
  19. 说明2:awk默认分隔符为空格
  20. 说明3:xargs 表示取出前面命令运行地结果,作为后面命令地输入参数
复制代码

6)Kafka安装

具体安装方式请点击博客【Kafka】Kafka入门解析(一)

7)日志生成

前提:将我们之前写好的日志数据生成代码打包放到服务器上

日志启动

【1】代码参数说明
  1. // 参数一:控制发送每条的延时时间,默认是 0
  2. Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
  3. // 参数二:循环遍历次数
  4. int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
复制代码
【2】 将 生 成 的 jar 包 log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 拷 贝 到
node01 服务器 /opt 目录下,并同步到 node02

【3】在 node01 上执行 jar 程序

方式1:
  1. java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.zsy.appclient.AppMain  > /dev/null 2>&1
  2. 说明:
  3. 如果打包时没有指定主函数,则使用 -classpath,并在 jar 包后面指定主函数全类名
复制代码
方式2:
  1. java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar > /dev/null 2>&1
  2. 说明:
  3. 如果打包时指定了主函数,则可以使用 -jar ,此时不用指定主函数的全类名
复制代码
说明:/dev/null 代表 linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”

  • 标准输入 0:从键盘获得输入 /proc/self/fd/0
  • 标准输出 1:输出到屏幕(即控制台) /proc/self/fd/1
  • 错误输出 2:输出到屏幕(即控制台) /proc/self/fd/2

【4】查看日志数据,在我们指定好的目录/opt/tmp/logs目录下查看数据

【5】脚本

我们为了方便使用,就通过脚本来实现数据的生成!

日志数据生成脚本如下
  1. #! /bin/bash
  2. for i in node01 node02
  3. do
  4.         echo "========== $i 生成日志数据中... =========="
  5.         ssh $i "java -jar /opt/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar  $1 $2  >/dev/null 2>&1 &"
  6. done
复制代码
时间同步脚本(临时脚本,只是后续为了时间同步而需要的)如下
  1. #!/bin/bash
  2. for i in node01 node02 node03
  3. do
  4.         echo "========== $i =========="
  5.         ssh -t $i "sudo date -s $1"
  6. done
  7. 参数说明:
  8. 我们可以发现上面的参数中我们使用了 -t ,是因为我们使用了 sudo ,所以需要使用 -t 参数来形成虚拟终端,不需要深究,只要使用 sudo,在ssh后面添加 -t 接口
复制代码
8)Flume消费Kafka数据存储在HDFS

【1】我们在前面配置了node01、node02采集日志数据传输到Kafka,现在我们需要在node03消费Kafka数据存储到HDFS上
架构图如下

20200429172405323.png

Flume配置如下
  1. ## 组件
  2. a1.sources=r1 r2
  3. a1.channels=c1 c2
  4. a1.sinks=k1 k2
  5. ## source1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.batchSize = 5000
  8. a1.sources.r1.batchDurationMillis = 2000
  9. a1.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  10. a1.sources.r1.kafka.topics = topic_start
  11. ## source2
  12. a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
  13. a1.sources.r2.batchSize = 5000
  14. a1.sources.r2.batchDurationMillis = 2000
  15. a1.sources.r2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  16. a1.sources.r2.kafka.topics = topic_event
  17. ## channel1
  18. a1.channels.c1.type = file
  19. a1.channels.c1.checkpointDir = /opt/modules/flume/checkpoint/behavior1
  20. a1.channels.c1.dataDirs = /opt/modules/flume/data/behavior1/
  21. a1.channels.c1.maxFileSize = 2146435071
  22. a1.channels.c1.capacity = 1000000
  23. a1.channels.c1.keep-alive = 6
  24. ## channel2
  25. a1.channels.c2.type = file
  26. a1.channels.c2.checkpointDir = /opt/modules/flume/checkpoint/behavior2
  27. a1.channels.c2.dataDirs = /opt/modules/flume/data/behavior2/
  28. a1.channels.c2.maxFileSize = 2146435071
  29. a1.channels.c2.capacity = 1000000
  30. a1.channels.c2.keep-alive = 6
  31. ## sink1
  32. a1.sinks.k1.type = hdfs
  33. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
  34. a1.sinks.k1.hdfs.filePrefix = logstart-
  35. ##sink2
  36. a1.sinks.k2.type = hdfs
  37. a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
  38. a1.sinks.k2.hdfs.filePrefix = logevent-
  39. ## 不要产生大量小文件
  40. a1.sinks.k1.hdfs.rollInterval = 3600
  41. a1.sinks.k1.hdfs.rollSize = 134217728
  42. a1.sinks.k1.hdfs.rollCount = 0
  43. a1.sinks.k2.hdfs.rollInterval = 10
  44. a1.sinks.k2.hdfs.rollSize = 134217728
  45. a1.sinks.k2.hdfs.rollCount = 0
  46. ## 控制输出文件是原生文件。
  47. a1.sinks.k1.hdfs.fileType = CompressedStream
  48. a1.sinks.k2.hdfs.fileType = CompressedStream
  49. a1.sinks.k1.hdfs.codeC = lzop
  50. a1.sinks.k2.hdfs.codeC = lzop
  51. ## 拼装
  52. a1.sources.r1.channels = c1
  53. a1.sinks.k1.channel= c1
  54. a1.sources.r2.channels = c2
  55. a1.sinks.k2.channel= c2
复制代码
【2】FileChannel 和 MemoryChannel 区别

  • MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求
  • FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从失败中恢复数据

【3】FileChannel 优化

通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量
官方说明如下:
  1. Comma separated list of directories for storing log files. Using
  2. multiple directories on separate disks can improve file channel
  3. peformance
复制代码
checkpointDirbackupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证
checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据

【4】Sink:HDFS Sink

  • (1)HDFS 存入大量小文件,有什么影响?

  • 元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用Namenode 服务器大量内存,影响 Namenode 性能和使用寿命
  • 计算层面:默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间

  • (2)HDFS 小文件处理

官方默认的这三个参数配置写入 HDFS 后会产生小文件,hdfs.rollInterval、hdfs.rollSize、
hdfs.rollCount
基于以上 hdfs.rollInterval=3600hdfs.rollSize=134217728hdfs.rollCount =0 几个参数综
合作用,效果如下:

  • (1)文件在达到 128M 时会滚动生成新文件
  • (2)文件创建超 3600 秒时会滚动生成新文件

9)数据生产!!!

终于,在前面铺垫了那么多之后,我们终于可以生产数据,并把数据存储到HDFS上了,现在我们来整理整体流程!!!

流程如下

1)启动Zookeeper
2)启动Hadoop集群
3)启动Kafka
4)启动Flume
5)生产数据

此时我们可以去HDFS上查看数据了

20200429213404216.png

Flume 内存优化

1)问题描述:如果启动消费 Flume 抛出如下异常
  1. ERROR hdfs.HDFSEventSink: process failed
  2. java.lang.OutOfMemoryError: GC overhead limit exceeded
复制代码
2)解决方案步骤:

  • (1)在 node01 服务器的 /opt/modules/flume/conf/flume-env.sh 文件中增加如下配置
  1. export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
复制代码
  • (2)同步配置到其他服务器

3)Flume 内存参数设置及优化

  • JVM heap 一般设置为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)
  • -Xmx 与-Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 fullgc
  • -Xms 表示 JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示 JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc

数据采集通道启动/停止脚本

1)vim cluster.sh
  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. echo " -------- 启动 集群 -------"
  5. #启动 Zookeeper 集群
  6. zk.sh start
  7. sleep 1s;
  8. echo " -------- 启动 hadoop 集群 -------"
  9. /opt/modules/hadoop/sbin/start-dfs.sh
  10. ssh node02 "/opt/modules/hadoop/sbin/start-yarn.sh"
  11. sleep 7s;
  12. #启动 Flume 采集集群
  13. f1.sh start1
  14. #启动 Kafka 采集集群
  15. kk.sh start
  16. sleep 7s;
  17. #启动 Flume 消费集群
  18. f2.sh start
  19. };;
  20. "stop"){
  21. echo " -------- 停止 集群 -------"
  22. #停止 Flume 消费集群
  23. f2.sh stop
  24. #停止 Kafka 采集集群
  25. kk.sh stop
  26. sleep 7s;
  27. #停止 Flume 采集集群
  28. f1.sh stop
  29. echo " -------- 停止 hadoop 集群 -------"
  30. ssh node02 "/opt/modules/hadoop/sbin/stop-yarn.sh"
  31. /opt/modules/hadoop/sbin/stop-dfs.sh
  32. sleep 7s;
  33. #停止 Zookeeper 集群
  34. zk.sh stop
  35. };;
  36. esac
复制代码
结束语
至此,我们数据生产并清洗传输到HDFS结束了
后续,我们需要开始搭建Hive,进行建模了,敬请期待下一篇博客!



加微信w3aboutyun,获取更多资源



领取100本书+1T资源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26480

大数据5个项目视频
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25235

名企资源、名企面试题、最新BAT面试题、专题面试题等资源汇总
https://www.aboutyun.com/forum.php?mod=viewthread&tid=27732


文章来源:https://blog.csdn.net/qq_43733123/article/details/105813953







没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条