分享

hadoop 学习笔记(1)

本帖最后由 PeersLee 于 2016-3-23 15:59 编辑
问题导读:

1.如何用mr 实现倒排索引?
2.什么是zookeeper?
3.怎样安装Zookeeper并运行?

4.何为 hadoop HA(High Available)模式?





解决方案:

mr 实现倒排索引:

  • 第一步:

代码:

[mw_shl_code=java,true]package peerslee.hadoop.mr.ii;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class InverseIndexStepOne {
        //mapper
        /*
         * 提取文件名,分割单词,将每个单词和文件名以相应的格式写入reducer中
         */
        public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
                @Override
                protected void map(LongWritable key, Text value,Context context)
                                throws IOException, InterruptedException {
                        //按行取数据
                        String line = value.toString();
                        //截取(注意截取的相隔符号)
                        String []fields = StringUtils.split(line,' ');
                        //得到文件切片
                        FileSplit fileSplit = (FileSplit) context.getInputSplit();
                        //得到文件名
                        String fileName = fileSplit.getPath().getName();
                        
                        //将单词和文件名循环写入Reducer,每个单词一次
                        for(String field :fields) {
                                context.write(new Text(field + "->" + fileName), new LongWritable(1) );
                        }
                }
        }
        
        //reducer
        /*
         * 将每个单词出现的次数进行累加
         */
        public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
                @Override
                protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
                        long sum = 0;
                        for(LongWritable value : values) {
                                sum += value.get();
                        }
                        context.write(key, new LongWritable(sum));
                }
        }
        
        //runner
        public static void main(String[] args) throws Exception {
                //job
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
               
                //set
                job.setJarByClass(InverseIndexStepOne.class);
               
                job.setMapperClass(StepOneMapper.class);
                job.setReducerClass(StepOneReducer.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
               
                //path
                FileInputFormat.setInputPaths(job, new Path(args[0]));
               
                /*
                 * 如果存在输出路径,HDFS将删除该文件夹
                 */
                 Path outputPath = new Path(args[1]);
                 FileSystem fs = FileSystem.get(conf);
               
                 if(fs.exists(outputPath)) {
                         fs.delete(outputPath,true);
                 }
                 FileOutputFormat.setOutputPath(job, outputPath);
                 
                 System.exit(job.waitForCompletion(true)?0:1);
               
        }
}
[/mw_shl_code]

测试:

[mw_shl_code=shell,true][hadoop@master ~]$ hadoop fs -ls /ii
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2016-03-15 02:16 /ii/ii_input
[hadoop@master ~]$ hadoop jar ii.jar peerslee.hadoop.mr.ii.InverseIndexStepOne /ii/ii_input /ii/ii_output
16/03/15 02:18:45 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/15 02:18:45 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/15 02:18:46 INFO input.FileInputFormat: Total input paths to process : 3
16/03/15 02:18:46 INFO mapreduce.JobSubmitter: number of splits:3
16/03/15 02:18:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458032748939_0001
16/03/15 02:18:47 INFO impl.YarnClientImpl: Submitted application application_1458032748939_0001
16/03/15 02:18:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1458032748939_0001/
16/03/15 02:18:47 INFO mapreduce.Job: Running job: job_1458032748939_0001
16/03/15 02:18:58 INFO mapreduce.Job: Job job_1458032748939_0001 running in uber mode : false
16/03/15 02:18:58 INFO mapreduce.Job:  map 0% reduce 0%
16/03/15 02:19:11 INFO mapreduce.Job:  map 100% reduce 0%
16/03/15 02:19:22 INFO mapreduce.Job:  map 100% reduce 100%
16/03/15 02:19:23 INFO mapreduce.Job: Job job_1458032748939_0001 completed successfully
16/03/15 02:19:23 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=970
                FILE: Number of bytes written=372369
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=547
                HDFS: Number of bytes written=499
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=3
                Launched reduce tasks=1
                Data-local map tasks=3
                Total time spent by all maps in occupied slots (ms)=33995
                Total time spent by all reduces in occupied slots (ms)=7410
                Total time spent by all map tasks (ms)=33995
                Total time spent by all reduce tasks (ms)=7410
                Total vcore-seconds taken by all map tasks=33995
                Total vcore-seconds taken by all reduce tasks=7410
                Total megabyte-seconds taken by all map tasks=34810880
                Total megabyte-seconds taken by all reduce tasks=7587840
        Map-Reduce Framework
                Map input records=13
                Map output records=43
                Map output bytes=878
                Map output materialized bytes=982
                Input split bytes=303
                Combine input records=0
                Combine output records=0
                Reduce input groups=34
                Reduce shuffle bytes=982
                Reduce input records=43
                Reduce output records=34
                Spilled Records=86
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=704
                CPU time spent (ms)=2250
                Physical memory (bytes) snapshot=534474752
                Virtual memory (bytes) snapshot=1449422848
                Total committed heap usage (bytes)=378744832
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=244
        File Output Format Counters
                Bytes Written=499
[hadoop@master ~]$ hadoop fs -ls /ii/ii_output
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2016-03-15 02:19 /ii/ii_output/_SUCCESS
-rw-r--r--   1 hadoop supergroup        499 2016-03-15 02:19 /ii/ii_output/part-r-00000
[hadoop@master ~]$ hadoop fs -cat /ii/ii_output/part-r-00000
Happiness->C.txt        1
May->B.txt      1
There->A.txt    1
along->C.txt    1
are->A.txt      1
cry->C.txt      1
enough->B.txt   2
for->C.txt      1
happiness->B.txt        1
have->B.txt     1
human->B.txt    1
hurt->C.txt     1
in->A.txt       1
keep->B.txt     1
lies->C.txt     1
life->A.txt     1
make->B.txt     2
miss->A.txt     1
moments->A.txt  1
much->A.txt     1
so->A.txt       1
someone->A.txt  1
sorrow->B.txt   1
strong,enough->B.txt    1
sweet->B.txt    1
their->C.txt    1
those->C.txt    2
to->B.txt       3
trials->B.txt   1
way->C.txt      1
when->A.txt     1
who->C.txt      2
you->A.txt      1
you->B.txt      4[/mw_shl_code]

————————————————————————————————————————

  • 第二步:


代码:

[mw_shl_code=java,true]package peerslee.hadoop.mr.ii;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class InversIndexStepTwo {
        //mapper
        public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
                /*
                 * 输入的格式 <word->filename        count>
                 */
                @Override
                protected void map(LongWritable key, Text value, Context context)
                                throws IOException, InterruptedException {
                        String line = value.toString();
                        String []fields = StringUtils.split(line,"\t");
                        
                        String []word_filename = StringUtils.split(fields[0],"->");
                        
                        //word,filename,count
                        String word = word_filename[0];
                        String filename = word_filename[1];
                        long count = Long.parseLong(fields[1]);
                        
                        context.write(new Text(word), new Text(filename+"->"+count));
                        
                }
        }
        
        //reducer
        public static class StepTwoReducer extends Reducer<Text, Text, Text, Text> {
                /*
                 * 输入格式<word        filename->count>
                 */
                @Override
                protected void reduce(Text key, Iterable<Text> values, Context context)
                                throws IOException, InterruptedException {
                        //统计某个单词在所有文本中出现的次数
                        String result = "";
                        
                        for (Text value : values) {
                                result += value + " ";
                        }
                        /*
                         * 输出格式<word        filename->count        filename1->count1>
                         */
                        context.write(key ,new Text(result));
                }
        }
        
        //runnner
        public static void main(String[] args) throws Exception {
                //job
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
               
                //set
                job.setJarByClass(InversIndexStepTwo.class);
                job.setMapperClass(StepTwoMapper.class);
                job.setReducerClass(StepTwoReducer.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
               
                //path
                FileInputFormat.setInputPaths(job, args[0]);
               
                Path outputPath = new Path(args[1]);
                FileSystem fs = FileSystem.get(conf);
               
                if(fs.exists(outputPath)) {
                        fs.delete(outputPath,true);
                }
               
                //waitforcompletion
                System.exit(job.waitForCompletion(true)?0:1);
               
        }
}
[/mw_shl_code]

测试:


2016-03-15_180642.png






ZooKeeper:



a08b87d6277f9e2ff86730891f30e924b999f3e5.jpg


  • ZooKeeper的简介:


  • 简洁性:Zookeeper的和兴是精简的文件系统,提供一些简单的操作和一些额外的抽线操作,比如排序和通知。
  • 高可用性:Zookeeper运行在一组机器上,他可以帮助系统避免出现单节点故障,一次可以用于构建一个可靠的应用程序。
  • 采用松耦合交互方式:在Zookeeper支持的交互过程中,参与者不需要彼此了解。
  • Zookeeper是一个资源库:它提供了一个 通用协调模式实现方法的开源共享库。




  • 初识Zookeeper:


假设每一个应用(client)中存储的配置信息都相同,在集群运行的过程当中其中一个节点的应用发生了改变的时候,配置信息同样发生改变,要保证所有节点的数据(配置信息)的一致性,我们可以将每一个节点中保存的配置文件全都抛到Zookeeper中,所有的节点应用都通过这个Zookeeper来访问这份数据,Zookeeper集群会保证这个应用的集群的数据(配置信息)一致性。

2016-03-14_150453.png





Zookeeper的安装和简单使用:

  • 安装:

(1)解压:

[mw_shl_code=shell,true]tar -xvzf zookeeper-3.4.5.tar.gz -C ~/app/[/mw_shl_code]

(2)配置:

2.1
添加一个zoo.cfg配置文件在 $ZOOKEEPER/conf 下

[mw_shl_code=shell,true]
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg

需要修改的变量:
#心跳的时间间隔
  tickTime=2000
  #启动时,如果超过10次心跳对方节点没有反应,就判断其为启动失败
   initLimit=10
  #运行时,如果超过5次心跳对方节点没有反应,就判断其为启动失败
   syncLimit=5
   dataDir=/home/hadoop/app/zookeeper-3.4.5/data
   #客户端通过哪一个端口来连接
   clientPort=2181
   #指定集群中的机器
   #2888 leader 与 flower 之间通信的端口
   #3888 选举机制的使用的端口 -> 多数票者成为leader
   server.1=master:2888:3888[/mw_shl_code]

2.2
在 zoo.cfg 文件中的dataDir(dataDir=/home/hadoop/app/zookeeper-3.4.5/data)创建myid 并将 1 写进去(其他节点就是2、3...这个可以自己设置)


[mw_shl_code=shell,true][hadoop@master zookeeper-3.4.5]$ mkdir data
[hadoop@master zookeeper-3.4.5]$ echo 1 > myid
[hadoop@master zookeeper-3.4.5]$ cat myid
1
[hadoop@master zookeeper-3.4.5]$ pwd
/home/hadoop/app/zookeeper-3.4.5
[hadoop@master zookeeper-3.4.5]$ [/mw_shl_code]

2.3
将配置好的 zookeeper-3.4.5/ 拷贝到其他节点,在其他节点上一定要修改myid的内容。


2.4
启动集群 - 分别启动Zookeeper - ./zkServer.sh start。



(3)测试:


[mw_shl_code=shell,true][hadoop@master bin]$ ./zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@master bin]$ jps
2917 QuorumPeerMain
2944 Jps
[hadoop@master bin]$ netstat -nltp | grep 2917
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp        0      0 :::52814                    :::*                        LISTEN      2917/java           
tcp        0      0 :::2181                     :::*                        LISTEN      2917/java           
[hadoop@master bin]$ ./zkServer.sh status
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: standalone(我是一台机器,所以这样,要不就是leader/flower)[/mw_shl_code]
  • 补充:

(1)


Zookeeper管理客户存放的数据采用的是类似于文件树的结构,每一个节点叫一个node  /oder-anlalyze(info_data<1M)/aa(info_data<1M)一般Zookeeper中存的是很小的数据,比如说:字符串,数字等。



(2)Zookeeper提供给我们的一些控制台命令:



[mw_shl_code=shell,true][zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path
[zk: localhost:2181(CONNECTED) 1]

[zk: localhost:2181(CONNECTED) 7] create /peerslee 8678678
Created /peerslee

[zk: localhost:2181(CONNECTED) 10] get /peerslee
8678678
cZxid = 0x2
ctime = Wed Mar 16 04:46:06 PDT 2016
mZxid = 0x2
mtime = Wed Mar 16 04:46:06 PDT 2016
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0[/mw_shl_code]

(3)注意:
在集群中,当在master(其中一个节点)中改变peerslee 其他节点会实时(重点)同步如果在一个3个节点的集群中杀死一个,其他节点会正常存活,因为此时还在工作的节点超过默认配置中节点个数的一半,当节点再死一个,此时就会报错。因此,其可靠性是很高的。




hadoop2.x HA高可靠性模式:

  • 概述:



1.1 hadoop 1.0 的单点问题

在hadoop1时代,只有一个NameNode。如果该NameNode数据丢失或者不能工作,那么整个集群就不能恢复了。这是hadoop1中的单点问题,也是hadoop1不可靠的表现。


2016-03-17_160342.png

1.2 hadoop 2.x 对 hadoop 1.x 单点问题的解决

为了解决hadoop1中的单点问题,在hadoop2中新的NameNode不再是只有一个,可以有多个(目前只支持2个)。每一个都有相同的职能。一个是active状态的,一个是standby状态的。当集群运行时,只有active状态的NameNode是正常工作的,standby状态的NameNode是处于待命状态的,时刻同步active状态NameNode的数据。一旦active状态的NameNode不能工作,通过手工或者自动切换,standby状态的NameNode就可以转变为active状态的,就可以继续工作了。这就是高可靠。

1.3 使用JournalNode实现NameNode(Active和Standby)数据的共享

Hadoop2.0中,2个NameNode的数据其实是实时共享的。新HDFS采用了一种共享机制,Quorum Journal Node(JournalNode)集群或者Nnetwork File System(NFS)进行共享。NFS是操作系统层面的,JournalNode是hadoop层面的,我们这里使用JournalNode集群进行数据共享(这也是主流的做法)。如下图所示,便是JournalNode的架构图。


2016-03-17_161753.png

两个NameNode为了数据同步,会通过一组称作JournalNodes的独立进程进行相互通信。当active状态的NameNode的命名空间有任何修改时,会告知大部分的JournalNodes进程。standby状态的NameNode有能力读取JNs中的变更信息,并且一直监控edit log的变化,把变化应用于自己的命名空间。standby可以确保在集群出错时,命名空间状态已经完全同步了。

1.4 NameNode之间的故障切换


对于HA集群而言,确保同一时刻只有一个NameNode处于active状态是至关重要的。否则,两个NameNode的数据状态就会产生分歧,可能丢失数据,或者产生错误的结果。为了保证这点,这就需要利用使用ZooKeeper了。首先HDFS集群中的两个NameNode都在ZooKeeper中注册,当active状态的NameNode出故障时,ZooKeeper能检测到这种情况,它就会自动把standby状态的NameNode切换为active状态。

  • HA 集群搭建

2.1 hadoop2.5.2HA高可靠性集群搭建(hadoop+Zookeeper)

2.2 Hadoop-2.6.0学习笔记(一)HA集群搭建




已有(2)人评论

跳转到指定楼层
helianthus 发表于 2016-3-8 21:00:03
请问这。。。。什么意思呢 ?
回复

使用道具 举报

PeersLee 发表于 2016-3-9 19:19:54
helianthus 发表于 2016-3-8 21:00
请问这。。。。什么意思呢 ?

抱歉,这个正在学习,我对发帖子(删除帖子操作不是很了解)不是很熟练,这是个失误。
有好的资料和资源可以推荐哈,,,
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条