storm组件复杂组合
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout1", new MySpout());
//多个组件接收同一个组件的输出
topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1");
topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("spout1");
topologyBuilder.setBolt("bolt3", new MyBolt3()).shuffleGrouping("spout1");
//一个组件接收多个组件的输出数据
topologyBuilder.setBolt("bolt4", new MyBolt4()).shuffleGrouping("bolt1").shuffleGrouping("bolt2").shuffleGrouping("bolt3");
----------------------------------------------------------------------
hadoop rpc介绍及代码实现
其他文档
storm drpc分布式远程过程调用
可以实现跨主机夸进程访问
本地模式
远程模式
原理:DRPC服务器与topology在同一主机上,client请求访问链接DRPC服务器,
然后使用系统内置的DRPCSpout传输数据,然后用户自定义bolt,最后系统内置“ReturnResults”的bolt连接到DRPC服务器,
将结果返回给client
client->DRPCServer->DRPCSpout->自定义bolt->ReturnResultBolt->DRPCServe->client
客户端发送功能名称及功能所需参数到DRPC服务器去执行。
拓扑它使用DRPCSpout从DRPC服务器接收功能调用流。
每个功能调用通过DRPC服务器使用唯一ID标记,随后拓扑计算结果,
在拓扑的最后,一个称之为“ReturnResults”的bolt连接到DRPC服务器,
把结果交给这个功能调用(根据功能调用ID),
DRPC服务器根据ID找到等待中的客户端,
为等待中的客户端消除阻塞,并发送结果给客户端。
drpc代码实现
/**
* drpc原理:client->DRPCServer->DRPCSpout->自定义bolt->ReturnResultBolt->DRPCServe->client
* 单词拼接的功能,功能名称fun_name
* 用户发送过来一个字符串,返回一个拼接了hello的字符串
*
*
*/
public class LocalDrpcTopology {
public static class MyBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
/**
* DRPCSpout为client传来的数据做了封装发射出的输出流是[id, result]格式
* tuple中封装了两个参数
* 第一个:表示参数的id
* 第二个:表示具体的参数内容
*/
@Override
public void execute(Tuple input) {
String value = input.getString(1);
value = "hello "+value;
this.collector.emit(new Values(input.getValue(0),value));//tuple封装了两个所发射出去两个
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","value"));//字段随便命名,ReturnResultBolt根据下标处理
}
}
public static void main(String[] args) {
LinearDRPCTopologyBuilder topologyBuilder = new LinearDRPCTopologyBuilder("fun_name");//drpc对外提供一个功能,名称是fun_name
topologyBuilder.addBolt(new MyBolt());
//定义本地集群
LocalCluster localCluster = new LocalCluster();
String simpleName = LocalDrpcTopology.class.getSimpleName();
//创建本地drpc服务端
ILocalDRPC drpc = new LocalDRPC();
localCluster.submitTopology(simpleName, new Config(), topologyBuilder.createLocalTopology(drpc));
//模拟客户端访问,由于是模拟本地体现不出rpc的跨主机特点;。
String result = drpc.execute("fun_name", " storm");//第一个参数是drpc的函数名
System.err.println(result);
}
}
当创建LinearDRPCTopologyBuilder时,你把这个拓扑的DRPC功能名称告诉storm。
一个DRPC服务器可以协调许多功能,功能名称用于区别不同的功能,
首先声明的bolt将接收一个输入的2-tuples,第一个字段是请求ID,第二个字段是请求参数。
LinearDRPCTopologyBuilder认为最后的bolt会发射一个输出流,该输出流包含[id, result]格式的2-tuples。
最后,所有拓扑中间过程产生的元组(tuple)都包含请求id作为其第一个字段。
参考http://blog.chinaunix.net/uid-233938-id-3198826.html
storm优化
并行度:
一般worker和线程executor的比列1:10~15
worker:
根据cpu核数和处理的数据,来调整worker数量
一般一个topology使用12个worker比较合理
如果多增加,线程之间的内存通信变成进程之间的网络通信
acker:
如果不需要可靠性可以不跟踪tuple树。
代码设置
雪崩问题:
如果spout发射速度大于接受速度,数据累积,消耗内存,最终崩溃
解决办法:
增加bolt的处理能力
代码设置config.setMaxSpoutPending(num);当bolt还有设置的num个tuple没处理时
spout停止发射,当bolt消耗了之后tuple个数少于num,spout继续发射数据,
需要开启acker消息确认机制。
storm脚本启动关闭
1:一键启动集群中所有节点的进程
提前定义一个文件,文件名叫 supervisorhost 这个文件中保存所有从节点的ip信息,这个文件在storm的bin目录下
start-all.sh:这个脚本需要在主节点执行,脚本放在storm的bin目录下
vi supervisorhost
192.168.1.100
192.168.1.101
vi start-all.sh
#!/bin/bash
source /etc/profile
echo "start nimbus and ui on localhost"
nohup /usr/local/storm/bin/storm nimbus >/dev/null 2>&1 &
nohup /usr/local/storm/bin/storm ui >/dev/null 2>&1 &
cat supervisorhost | while read ip
do
echo "start supervisor and logviewer on $ip"
ssh $ip nohup /usr/local/storm/bin/storm supervisor >/dev/null 2>&1 &
ssh $ip nohup /usr/local/storm/bin/storm logviewer >/dev/null 2>&1 &
done
2:实现一键停止脚本
在主节点写一个stop-all.sh 在storm的bin目录下
vi stop-all.sh
#!/bin/bash
source /etc/profile
echo "stop nimbus and ui on localhost"
kill -9 `ps -ef|grep backtype.storm.daemon.nimbus | awk '{print $2}' | head -1` &
kill -9 `ps -ef|grep backtype.storm.ui.core | awk '{print $2}' | head -1` &
cat supervisorhost | while read ip
do
echo "stop supervisor and logviewer on $ip"
ssh $ip /usr/local/storm/bin/stop-supervisor.sh >/dev/null 2>&1 &
done
vi stop-supervisor.sh
#!/bin/bash
kill -9 `ps -ef|grep backtype.storm.daemon.supervisor | awk '{print $2}' | head -1` &
kill -9 `ps -ef|grep backtype.storm.daemon.logviewer | awk '{print $2}' | head -1` &
必须配置免密码登录了,通过ssh 启动其他服务器的脚本(no-login方式),如果这个脚本中用到了类似java这种环境变量的话,需要修改对应服务器的 ~/.bashrc文件,
在这个文件中 添加一行 source /etc/profile 否则会找不到java命令
storm日志
logback和log4j作为两套slf4j-api日志框架的实现
不能共同使用,不然会报日志冲突的错误
解决办法:
如果引入的第三方jar包中包含了log4j相关的包,需要在pom中排除掉
排除之后在本地运行的话可能会没有日志输出,可以单独引用log4j的相关依赖,把scope指定为provided
拓展:
echarts分布图 将数据在前端展现
jstorm 阿里使用Java编写的storm
trident :对storm的封装
在storm的基础上进行高层抽象,提供了过滤聚合等适合流式计算的函数
函数(function)
过滤器(filter)
连接(meger)
流分组(group by)
聚合(aggregate)
trident 例子:单词计数
public class TridentWordCount {
public static class MySpout implements IBatchSpout {
Fields fields;
HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
public MySpout(Fields fields) {
this.fields = fields;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
List<List<Object>> batch = this.batches.get(batchId);
if(batch == null){
batch = new ArrayList<List<Object>>();
Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[]{"txt"}, true);
for (File file : listFiles) {
List<String> readLines;
try {
readLines = FileUtils.readLines(file);
for (String line : readLines) {
batch.add(new Values(line));
}
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}
if(batch.size()>0){
this.batches.put(batchId, batch);
}
}
for(List<Object> list : batch){
collector.emit(list);
}
}
@Override
public void ack(long batchId) {
this.batches.remove(batchId);
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return fields;
}
}
public static class splitBolt extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String line = tuple.getString(0);
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}
}
public static class MyAgge extends BaseAggregator<Map<String, Integer>>{
@Override
public Map<String, Integer> init(Object batchId,
TridentCollector collector) {
return new HashMap<String,Integer>();
}
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple,
TridentCollector collector) {
String word = tuple.getString(0);
Integer integer = val.get(word);
if(integer==null){
integer=0;
}
integer++;
val.put(word, integer);
}
@Override
public void complete(Map<String, Integer> val,
TridentCollector collector) {
collector.emit(new Values(val));
}
}
public static class PrintBolt extends BaseFunction{
HashMap<String, Integer> allMap = new HashMap<String,Integer>();
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
Map<String, Integer> map = (Map<String, Integer>)tuple.getValue(0);
for (Entry<String, Integer> entry : map.entrySet()) {
String key = entry.getKey();
Integer value = entry.getValue();
Integer integer = allMap.get(key);
if(integer==null){
integer=0;
}
allMap.put(key, value+integer);
}
Utils.sleep(1000);
for (Entry<String, Integer> entry : allMap.entrySet()) {
System.out.println(entry);
}
}
}
public static void main(String[] args) {
TridentTopology tridentTopology = new TridentTopology();
tridentTopology.newStream("spoutid", new MySpout(new Fields("sentence")))
.each(new Fields("sentence"), new splitBolt(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new MyAgge(), new Fields("map"))
.each(new Fields("map"), new PrintBolt(), new Fields(""));
LocalCluster localCluster = new LocalCluster();
String simpleName = TridentWordCount2.class.getSimpleName();
localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
}
}
|
|