本帖最后由 PeersLee 于 2016-3-23 15:59 编辑
问题导读:
1.如何用mr 实现倒排索引?
2.什么是zookeeper?
3.怎样安装Zookeeper并运行?
4.何为 hadoop HA(High Available)模式?
解决方案:
mr 实现倒排索引:
代码:
[mw_shl_code=java,true]package peerslee.hadoop.mr.ii;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InverseIndexStepOne {
//mapper
/*
* 提取文件名,分割单词,将每个单词和文件名以相应的格式写入reducer中
*/
public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//按行取数据
String line = value.toString();
//截取(注意截取的相隔符号)
String []fields = StringUtils.split(line,' ');
//得到文件切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
//得到文件名
String fileName = fileSplit.getPath().getName();
//将单词和文件名循环写入Reducer,每个单词一次
for(String field :fields) {
context.write(new Text(field + "->" + fileName), new LongWritable(1) );
}
}
}
//reducer
/*
* 将每个单词出现的次数进行累加
*/
public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
long sum = 0;
for(LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
//runner
public static void main(String[] args) throws Exception {
//job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//set
job.setJarByClass(InverseIndexStepOne.class);
job.setMapperClass(StepOneMapper.class);
job.setReducerClass(StepOneReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//path
FileInputFormat.setInputPaths(job, new Path(args[0]));
/*
* 如果存在输出路径,HDFS将删除该文件夹
*/
Path outputPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)) {
fs.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true)?0:1);
}
}
[/mw_shl_code]
测试:
[mw_shl_code=shell,true][hadoop@master ~]$ hadoop fs -ls /ii
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2016-03-15 02:16 /ii/ii_input
[hadoop@master ~]$ hadoop jar ii.jar peerslee.hadoop.mr.ii.InverseIndexStepOne /ii/ii_input /ii/ii_output
16/03/15 02:18:45 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/15 02:18:45 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/15 02:18:46 INFO input.FileInputFormat: Total input paths to process : 3
16/03/15 02:18:46 INFO mapreduce.JobSubmitter: number of splits:3
16/03/15 02:18:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458032748939_0001
16/03/15 02:18:47 INFO impl.YarnClientImpl: Submitted application application_1458032748939_0001
16/03/15 02:18:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1458032748939_0001/
16/03/15 02:18:47 INFO mapreduce.Job: Running job: job_1458032748939_0001
16/03/15 02:18:58 INFO mapreduce.Job: Job job_1458032748939_0001 running in uber mode : false
16/03/15 02:18:58 INFO mapreduce.Job: map 0% reduce 0%
16/03/15 02:19:11 INFO mapreduce.Job: map 100% reduce 0%
16/03/15 02:19:22 INFO mapreduce.Job: map 100% reduce 100%
16/03/15 02:19:23 INFO mapreduce.Job: Job job_1458032748939_0001 completed successfully
16/03/15 02:19:23 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=970
FILE: Number of bytes written=372369
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=547
HDFS: Number of bytes written=499
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=33995
Total time spent by all reduces in occupied slots (ms)=7410
Total time spent by all map tasks (ms)=33995
Total time spent by all reduce tasks (ms)=7410
Total vcore-seconds taken by all map tasks=33995
Total vcore-seconds taken by all reduce tasks=7410
Total megabyte-seconds taken by all map tasks=34810880
Total megabyte-seconds taken by all reduce tasks=7587840
Map-Reduce Framework
Map input records=13
Map output records=43
Map output bytes=878
Map output materialized bytes=982
Input split bytes=303
Combine input records=0
Combine output records=0
Reduce input groups=34
Reduce shuffle bytes=982
Reduce input records=43
Reduce output records=34
Spilled Records=86
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=704
CPU time spent (ms)=2250
Physical memory (bytes) snapshot=534474752
Virtual memory (bytes) snapshot=1449422848
Total committed heap usage (bytes)=378744832
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=244
File Output Format Counters
Bytes Written=499
[hadoop@master ~]$ hadoop fs -ls /ii/ii_output
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2016-03-15 02:19 /ii/ii_output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 499 2016-03-15 02:19 /ii/ii_output/part-r-00000
[hadoop@master ~]$ hadoop fs -cat /ii/ii_output/part-r-00000
Happiness->C.txt 1
May->B.txt 1
There->A.txt 1
along->C.txt 1
are->A.txt 1
cry->C.txt 1
enough->B.txt 2
for->C.txt 1
happiness->B.txt 1
have->B.txt 1
human->B.txt 1
hurt->C.txt 1
in->A.txt 1
keep->B.txt 1
lies->C.txt 1
life->A.txt 1
make->B.txt 2
miss->A.txt 1
moments->A.txt 1
much->A.txt 1
so->A.txt 1
someone->A.txt 1
sorrow->B.txt 1
strong,enough->B.txt 1
sweet->B.txt 1
their->C.txt 1
those->C.txt 2
to->B.txt 3
trials->B.txt 1
way->C.txt 1
when->A.txt 1
who->C.txt 2
you->A.txt 1
you->B.txt 4[/mw_shl_code]
————————————————————————————————————————
代码:
[mw_shl_code=java,true]package peerslee.hadoop.mr.ii;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class InversIndexStepTwo {
//mapper
public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
/*
* 输入的格式 <word->filename count>
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String []fields = StringUtils.split(line,"\t");
String []word_filename = StringUtils.split(fields[0],"->");
//word,filename,count
String word = word_filename[0];
String filename = word_filename[1];
long count = Long.parseLong(fields[1]);
context.write(new Text(word), new Text(filename+"->"+count));
}
}
//reducer
public static class StepTwoReducer extends Reducer<Text, Text, Text, Text> {
/*
* 输入格式<word filename->count>
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//统计某个单词在所有文本中出现的次数
String result = "";
for (Text value : values) {
result += value + " ";
}
/*
* 输出格式<word filename->count filename1->count1>
*/
context.write(key ,new Text(result));
}
}
//runnner
public static void main(String[] args) throws Exception {
//job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//set
job.setJarByClass(InversIndexStepTwo.class);
job.setMapperClass(StepTwoMapper.class);
job.setReducerClass(StepTwoReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//path
FileInputFormat.setInputPaths(job, args[0]);
Path outputPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)) {
fs.delete(outputPath,true);
}
//waitforcompletion
System.exit(job.waitForCompletion(true)?0:1);
}
}
[/mw_shl_code]
测试:
ZooKeeper:
- 简洁性:Zookeeper的和兴是精简的文件系统,提供一些简单的操作和一些额外的抽线操作,比如排序和通知。
- 高可用性:Zookeeper运行在一组机器上,他可以帮助系统避免出现单节点故障,一次可以用于构建一个可靠的应用程序。
- 采用松耦合交互方式:在Zookeeper支持的交互过程中,参与者不需要彼此了解。
- Zookeeper是一个资源库:它提供了一个 通用协调模式实现方法的开源共享库。
假设每一个应用(client)中存储的配置信息都相同,在集群运行的过程当中其中一个节点的应用发生了改变的时候,配置信息同样发生改变,要保证所有节点的数据(配置信息)的一致性,我们可以将每一个节点中保存的配置文件全都抛到Zookeeper中,所有的节点应用都通过这个Zookeeper来访问这份数据,Zookeeper集群会保证这个应用的集群的数据(配置信息)一致性。
Zookeeper的安装和简单使用:
(1)解压:
[mw_shl_code=shell,true]tar -xvzf zookeeper-3.4.5.tar.gz -C ~/app/[/mw_shl_code]
(2)配置:
2.1
添加一个zoo.cfg配置文件在 $ZOOKEEPER/conf 下
[mw_shl_code=shell,true]
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
需要修改的变量:
#心跳的时间间隔
tickTime=2000
#启动时,如果超过10次心跳对方节点没有反应,就判断其为启动失败
initLimit=10
#运行时,如果超过5次心跳对方节点没有反应,就判断其为启动失败
syncLimit=5
dataDir=/home/hadoop/app/zookeeper-3.4.5/data
#客户端通过哪一个端口来连接
clientPort=2181
#指定集群中的机器
#2888 leader 与 flower 之间通信的端口
#3888 选举机制的使用的端口 -> 多数票者成为leader
server.1=master:2888:3888[/mw_shl_code]
2.2
在 zoo.cfg 文件中的dataDir(dataDir=/home/hadoop/app/zookeeper-3.4.5/data)创建myid 并将 1 写进去(其他节点就是2、3...这个可以自己设置)
[mw_shl_code=shell,true][hadoop@master zookeeper-3.4.5]$ mkdir data
[hadoop@master zookeeper-3.4.5]$ echo 1 > myid
[hadoop@master zookeeper-3.4.5]$ cat myid
1
[hadoop@master zookeeper-3.4.5]$ pwd
/home/hadoop/app/zookeeper-3.4.5
[hadoop@master zookeeper-3.4.5]$ [/mw_shl_code]
2.3
将配置好的 zookeeper-3.4.5/ 拷贝到其他节点,在其他节点上一定要修改myid的内容。
2.4
启动集群 - 分别启动Zookeeper - ./zkServer.sh start。
(3)测试:
[mw_shl_code=shell,true][hadoop@master bin]$ ./zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@master bin]$ jps
2917 QuorumPeerMain
2944 Jps
[hadoop@master bin]$ netstat -nltp | grep 2917
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 :::52814 :::* LISTEN 2917/java
tcp 0 0 :::2181 :::* LISTEN 2917/java
[hadoop@master bin]$ ./zkServer.sh status
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: standalone(我是一台机器,所以这样,要不就是leader/flower)[/mw_shl_code]
(1)
Zookeeper管理客户存放的数据采用的是类似于文件树的结构,每一个节点叫一个node /oder-anlalyze(info_data<1M)/aa(info_data<1M)一般Zookeeper中存的是很小的数据,比如说:字符串,数字等。
(2)Zookeeper提供给我们的一些控制台命令:
[mw_shl_code=shell,true][zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
connect host:port
get path [watch]
ls path [watch]
set path data [version]
rmr path
delquota [-n|-b] path
quit
printwatches on|off
create [-s] [-e] path data acl
stat path [watch]
close
ls2 path [watch]
history
listquota path
setAcl path acl
getAcl path
sync path
redo cmdno
addauth scheme auth
delete path [version]
setquota -n|-b val path
[zk: localhost:2181(CONNECTED) 1]
[zk: localhost:2181(CONNECTED) 7] create /peerslee 8678678
Created /peerslee
[zk: localhost:2181(CONNECTED) 10] get /peerslee
8678678
cZxid = 0x2
ctime = Wed Mar 16 04:46:06 PDT 2016
mZxid = 0x2
mtime = Wed Mar 16 04:46:06 PDT 2016
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0[/mw_shl_code]
(3)注意:
在集群中,当在master(其中一个节点)中改变peerslee 其他节点会实时(重点)同步如果在一个3个节点的集群中杀死一个,其他节点会正常存活,因为此时还在工作的节点超过默认配置中节点个数的一半,当节点再死一个,此时就会报错。因此,其可靠性是很高的。
hadoop2.x HA高可靠性模式:
1.1 hadoop 1.0 的单点问题
在hadoop1时代,只有一个NameNode。如果该NameNode数据丢失或者不能工作,那么整个集群就不能恢复了。这是hadoop1中的单点问题,也是hadoop1不可靠的表现。
1.2 hadoop 2.x 对 hadoop 1.x 单点问题的解决
为了解决hadoop1中的单点问题,在hadoop2中新的NameNode不再是只有一个,可以有多个(目前只支持2个)。每一个都有相同的职能。一个是active状态的,一个是standby状态的。当集群运行时,只有active状态的NameNode是正常工作的,standby状态的NameNode是处于待命状态的,时刻同步active状态NameNode的数据。一旦active状态的NameNode不能工作,通过手工或者自动切换,standby状态的NameNode就可以转变为active状态的,就可以继续工作了。这就是高可靠。
1.3 使用JournalNode实现NameNode(Active和Standby)数据的共享
Hadoop2.0中,2个NameNode的数据其实是实时共享的。新HDFS采用了一种共享机制,Quorum Journal Node(JournalNode)集群或者Nnetwork File System(NFS)进行共享。NFS是操作系统层面的,JournalNode是hadoop层面的,我们这里使用JournalNode集群进行数据共享(这也是主流的做法)。如下图所示,便是JournalNode的架构图。
两个NameNode为了数据同步,会通过一组称作JournalNodes的独立进程进行相互通信。当active状态的NameNode的命名空间有任何修改时,会告知大部分的JournalNodes进程。standby状态的NameNode有能力读取JNs中的变更信息,并且一直监控edit log的变化,把变化应用于自己的命名空间。standby可以确保在集群出错时,命名空间状态已经完全同步了。
1.4 NameNode之间的故障切换
对于HA集群而言,确保同一时刻只有一个NameNode处于active状态是至关重要的。否则,两个NameNode的数据状态就会产生分歧,可能丢失数据,或者产生错误的结果。为了保证这点,这就需要利用使用ZooKeeper了。首先HDFS集群中的两个NameNode都在ZooKeeper中注册,当active状态的NameNode出故障时,ZooKeeper能检测到这种情况,它就会自动把standby状态的NameNode切换为active状态。
2.1 hadoop2.5.2HA高可靠性集群搭建(hadoop+Zookeeper)
2.2 Hadoop-2.6.0学习笔记(一)HA集群搭建
|
|