storm
什么是storm:
类似Hadoop的实时数据处理框架
storm和Hadoop的区别
Hadoop storm
数据来源:处理的是hdfs上历史数据, 处理实时新增的数据
处理过程:map阶段 reduce阶段 数据源spout和处理逻辑bolt
处理速度:处理hdfs上TB级数据速度慢 处理新增数据,处理实时
使用场景:不讲究时效,批量处理数据 处理某一新增数据
hadoop是代码执行完结束, storm处理的是数据,不会结束
storm设计思想:
storm是对流stream的抽象,流是一个不间断的无界的连续的tuple.
topology是storm种最高层次的抽象,它被提交到storm集群,
拓扑每个节点都要声明它所发射的元组的字段的name,其他节点订阅接收出处理。
storm中的核心概念
Nimbus: 控制节点,负责集群内分发代码,为worker分配任务,故障检测
Supervisor: 工作节点
Worker: 一个节点可以启动1个或多个worker
Executor: ,一个worker进程运行一个topology的一个或多个excutor线程
Task:一个executor 运行同一个组件的一个或多个task
Spout :组件,消息源,从数据库或者文档中读取数据向topology发送tuple
Bolt :组件,消息处理者,处理输入的数据流并产生新的输出数据流,可执行过滤聚合查询数据库等操作
Topology:集群中运行的一个个job任务。
Tuple:一个tuple就是一个值列表value list,
Stream: 消息流,
Stream Grouping:消息分发策略,定义如何分配给bolt
每个topology默认一个worker,一个superbisor节点最大可以启动4个worker,一个spout或者一个bolt会占用一个executor线程,每个线程executor默认启动一个task;
zookeeper集群负责协调工作,master和worker之间不依赖,而是将信息放到zookeeper,这样可以快速恢复失败一方。
storm本地目录:
目录位置:在配置文件中storm.local.dir: "/opt/storm/tmp"
nimbus 目录下有两个目录
inbox: 客户端上传jar的临时目录
stormdist:存放正在运行所有的jar包;没有任务就为空
jar包下有三个文件:stormjar.jar提交拓扑的jar包
stormcode.ser中包含拓扑中各个组件和拓扑的关系
stormconf.ser保存着conf对象(代码中设置的conf)
supervisor:
在zk上状态:启动zkCli.sh查看
ls /storm下 有workerbeats, errors, supervisors, storms, assignments
ls /storm/storms 目录内容和配置文件中的tmp下的ls nimbus/stormdist/的内容一样
storm安装:(3个节点的集群)
安装zookeeper集群(略)
安装storm:修改配置文件vi storm/conf/storm.yaml注意顶格,之间用空格
storm.zookeeper.servers:
- "192.168.1.160"
- "192.168.1.161"
- "192.168.1.162"
nimbus.host: "192.168.1.160"
storm.local.dir: "/usr/local/storm/tmp"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
复制到其他两个节点:scp -rq storm/ 192.168.1.161:/usr/local/
启动storm:
1.先启动zk:zkServer.sh start (如果时间不一致简单点可以手动修改 date -s 时间/日期)
2.主节点启动nimbus: nohup bin/storm nimbus >/dev/null 2>&1 &
启动ui: nohup bin/storm ui >/dev/null 2>&1 &
注:nohup是为了关掉窗口进程仍然存在,后面的是把那些启动时的日志信息丢到黑洞中
3.从节点启动supervisor: nohup bin/storm supervisor >/dev/null 2>&1 &
启动logviewer: nohup bin/storm logviewer >/dev/null 2>&1 &(可以在浏览器查看日志)
4.浏览器查看192.168.1.160:8080;如果ui界面不能查看日志
原因可能是没启动logviewer进程或者是hosts没有配置将主机名和ip映射
提交一个topology到集群:bin/storm jar 打的jar包 全类名 参数
停止:在ui界面kill
使用命令bin/storm list 查看storm中的所有topology信息
手动bin/storm kill Topology_name
流分组stream Grouping
1.shuffle Grouping :随机分组,可以达到负载均衡
2.Feild Grouping :字段分组,可以根据分组字段把分组内容相同的数据交给同一个线程处理
3.Global Grouping :全局分组,用于汇总,交给一个线程处理
4.All Grouping :广播分组,每个线程都会手动组件发送的数据
5.Non Grouping :不分组,类似shuffle Grouping
6.localOrShuffle Grouping :本地或者随机分组,发射组件和接受组件的线程在同一个worker内部,
只会让这一个线程去接受数据,其他线程不工作
7.custom Grouping : 自定义
并行度rebalance
task是storm最小逻辑单元,task是spout或者nolt的实例,storm中执行的就是task;
task运行在线程中(executor)
线程运行在进程worker中
提高并行度:task-->executor-->worker
1.提高线程(默认一个线程只有一个task)
2.提高task数量,不会直接提高并行度,
后期动态提高storm的并行度
topology提交之后,topology的task不会变了。
3.提高worker数量。
可靠性
worker进程挂掉,storm集群重启一个worker进程
supervisor进程挂掉,不会影响之前提交的topology,这个节点不在是集群一员不会分配任务给他
nimbus进程挂掉,不会影响之前提交的topoligy,不能再向集群提交新的topology
acker消息确认机制:
使用acker线程去监控数据的处理状态,ack();fail()
完全处理:
只有原始的tuple以及它所有衍生的tuple都被处理成功了,spout的ack方法才被调用
只要有一个衍生tuple处理失败,那么spout的fail方法就会被调用
定时任务
1:全局的定时任务
在main方法中通过config设置
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
//表示把定时间隔设置为5秒,这个时候strom每隔5秒就会发射一个系统级别的tuple
2.局部定时任务(经常用;例如实时查询时往往会间隔几秒把把数据存到数据库,实时的话数据太快会看花眼的)
在对应的bolt类中覆盖一个方法,getComponentConfiguration
@Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String,Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//局部定时任务,时间间隔5秒
return hashMap;
}
stormui
主要参考值:Bolts下面的Capacity :这个值接近1时,说明它很忙,需要考虑提高并行度
其他:process latency: bolt收到一个tuple到bolt ack这个tuple的平均时间
execute latency:bolt处理一个tuple的平均时间,不包含acker操作
execute latency和proces latnecy是处理消息的时效性,
而capacity则表示处理能力是否已经饱和,从这3个参数可以知道topology的瓶颈所在
伪代码:
1.单词计数和单词出现总次数
<1>spout 生成数据的spout
需要继承BaseRichSpout;nextuple将监视的文本按行读取并且发射出去(this.collector.emit(new Values(line));)
声明字段line,其他组件可以根据这个字段获取declarer.declare(new Fields("line"));代码如下
public static class MySpout extends BaseRichSpout{
/* Map conf, 配置参数类,保存的是storm的配置信息
* TopologyContext context, 上下文,保存一些全局参数
* SpoutOutputCollector collector,发射器,负责发送spout产生的tuple
*/
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//初始化
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[]{"txt"}, true);
for (File file : listFiles) {
try {
List<String> readLines = FileUtils.readLines(file);
for (String line : readLines) {
this.collector.emit(new Values(line));
}
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//fields中的参数和values中的参数需要一一对应
declarer.declare(new Fields("line"));
}
<2>bolt 按行分割单词的bolt
prepare初始化方法;execute(Tuple input)执行方法,接收spout发射的tuple。String line = input.getStringByField("line");
这里前面spout按行读取发射,有多少行这里执行多少次并且将分割后的每行的单词发射出去this.collector.emit(new Values(word));
declareOutputFields声明输出字段word
public static class SplitBolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
@Override
public void execute(Tuple input) {
//接收到一行内容,切割出单词
String line = input.getStringByField("line");
String[] words = line.split("\t");
//把单词发射出去
for (String word : words) {
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
<3>Bolt
局部汇总单词的bolt
基本和前面的bolt一样,this.collector.emit(new Values(word,integer));将单词和单词出现的次数都统计发射出去
声明输出字段"word"和"count" ;declarer.declare(new Fields("word","count"));
public static class JuBuBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
@Override
public void execute(Tuple input) {
//获取所有单词进行汇总
String word = input.getStringByField("word");
Integer integer = hashMap.get(word);
if(integer==null){
integer=0;
}
integer++;
hashMap.put(word, integer);
this.collector.emit(new Values(word,integer));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
<4>Bolt 获取所有单词汇总这里不需要往外发射tuple所以declareOutputFields方法为空
public static class AllCountBolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
@Override
public void execute(Tuple input) {
//获取所有单词进行汇总
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
hashMap.put(word, count);
Utils.sleep(1000);
System.err.println("===============================");
int num =0;
System.out.println("所有单词去重后的个数:"+hashMap.size());
for (Entry<String, Integer> entry : hashMap.entrySet()) {
System.out.println(entry);
num+=entry.getValue();
}
System.out.println("所有单词出现的总次数:"+num);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
<5>main
集群上一个topology就是StormTopology;
一个task 就是new MySpout();new MyBolt()
分组策略就是shuffleGrouping("spout1")随机分,达到负载均衡
fieldsGrouping("bolt1", new Fields("word"))相同的单词都会进入同一线程处理进行统计次数
globalGrouping("bolt2"),保证所有都汇总到一个线程统计总次数
config的信息就是被保存在ls nimbus/stormdist/stormconf.ser
提交到集群StormSubmitter.submitTopology
提交到本地localCluster.submitTopology
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout1", new MySpout());
topologyBuilder.setBolt("bolt1", new SplitBolt(),2).shuffleGrouping("spout1");
topologyBuilder.setBolt("bolt2", new JuBuBolt(),2).fieldsGrouping("bolt1", new Fields("word"));
topologyBuilder.setBolt("bolt3", new AllCountBolt()).globalGrouping("bolt2");
//创建topology
StormTopology createTopology = topologyBuilder.createTopology();
//设置storm的参数信息
Config config = new Config();
String topology_name = RemoteStormTopology.class.getSimpleName();
if(args.length==0){
//创建本地storm集群
LocalCluster localCluster = new LocalCluster();
//提交topology
localCluster.submitTopology(topology_name, config, createTopology);
}else{
try {
//提交到集群运行
StormSubmitter.submitTopology(topology_name, config, createTopology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
2.统计网站PV、UV、DV
类似这样的日志文档27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
soput读取每行数据发射出去
bolt1分割每行将ip发射出去
bolt2局部汇总,将ip放到hashmap中并ip出现的次数累加
最后将ip和ip出现的次数发射出去,同时声明字段ip和次数
String ip = input.getStringByField("ip");
Integer i = hashMap.get(ip);
if(i==null){
i=0;
}
i++;
hashMap.put(ip, i);
this.collector.emit(new Values(ip,i));
bolt3全局汇总:遍历map
HashMap<String, Integer> hashMap = new HashMap<String,Integer>();
public void execute(Tuple input) {
String ip = input.getStringByField("ip");
Integer count = input.getIntegerByField("count");
hashMap.put(ip, count);
int num =0;
System.out.println("UV个数:"+hashMap.size());
for (Entry<String, Integer> entry : hashMap.entrySet()) {
System.out.println(entry+"DV:"+entry.getValue() );
num+=entry.getValue();
}
System.out.println(" PV:"+num);
}
main :
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new MySpout());
topologyBuilder.setBolt("bolt1", new SplitBolt()).shuffleGrouping("spout1");
topologyBuilder.setBolt("bolt2", new JuBuBolt()).fieldsGrouping("bolt1", new Fields("ip"));
topologyBuilder.setBolt("bolt3", new AllCountBolt()).globalGrouping("bolt2");
其他代码:
acker机制代码实现:
1.在spout中的nextTuple()方法中发射时开启acker机制覆盖ack(Object msgId) 和fail(Object msgId) 方法
通过设置messageid开启消息确认机制
* messageid如何赋值呢?
* 注意:这个messageid的值一般设置为tuple里面封装的这一条数据的唯一id【在这要保证messageid和tuple中数据的唯一性,需要一一对应】
* messageid和tuple中内容的关系需要程序员来维护
* 例如:mysql article表:id content
messageid = num;
this.collector.emit(new Values("哈 哈 呵 呵"),messageid);
2.在bolt的execute(Tuple input)方法中进行处理
String word = input.getStringByField("word");
if(word.equals("哈")){
this.collector.fail(input);
}else{
this.collector.ack(input);
}
|
|