把结果输出到什么地方了,是由程序来定义的。
比如你保存到文件,还是保存到habse。所以数据在什么地方取决于程序。
比如下面例子:
就保存在了hbase
Storm0.8.1的(mapreduce) Spout/Blot编程实例实例详解
而如果你在文件中,所以肯定是找不到的了。
具体保存代码,如下,你可以参考
package com.ygc.mobilenet;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.inspur.hadoop.hbase.HTableClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
/**
* 负责将所有上网记录保存到HBase数据库中
*/
public class MobileNetLogSaveBolt implements IRichBolt {
private OutputCollector outputCollector;
private HTableClient tableClient;
private Log log = LogFactory.getLog(MobileNetLogSaveBolt.class);
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
try {
//初始化HBase数据库
tableClient = new HTableClient("192.168.1.230");
} catch (Exception e) {
log.error(e);
}
}
@Override
public void execute(Tuple tuple) {
try {
log.info("access tuple " + tuple.getValues()+"files info = " +tuple.getFields().toString());
saveTupleToHBase(tuple);
this.outputCollector.emit(tuple, tuple.getValues());
} catch (Exception e) {
log.error(e);
} finally {
outputCollector.ack(tuple);
}
}
/**
* 将接收的元组保存到数据库
* @param tuple 从其他计算节点接收的数据流,这里是从com.ygc.mobilenet.MobileNetLogAnalyseSpout中接收的数据流。注意和该类中声明的backtype.storm.tuple.Fields对象和emit的backtype.storm.tuple.Values对象对应
* @return
*/
private boolean saveTupleToHBase(Tuple tuple) {
try {
String rowKey = createRowKeyByeTuple(tuple);
long ts = System.currentTimeMillis();
tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
MobileNetLogVal.START_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD));//其中绿色即为storm的值
tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
MobileNetLogVal.RESPONSE_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.RESPONSE_TIME_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
MobileNetLogVal.END_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.END_TIME_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_DEV_IP_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_USER_IP_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_PORT_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
MobileNetLogVal.DEST_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_DEV_IP_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
MobileNetLogVal.DEST_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_USER_IP_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
MobileNetLogVal.DEST_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_PORT_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
MobileNetLogVal.APN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.APN_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
MobileNetLogVal.IMSI_FIELD, ts, tuple.getStringByField(MobileNetLogVal.IMSI_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
MobileNetLogVal.MSISDN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
MobileNetLogVal.URL_FIELD, ts, tuple.getStringByField(MobileNetLogVal.URL_FIELD));
tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
MobileNetLogVal.HOST_FIELD, ts, tuple.getStringByField(MobileNetLogVal.HOST_FIELD));
} catch (Exception e) {
log.error(e);
return false;
}
return true;
}
private String createRowKeyByeTuple(Tuple tuple) {
return tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD)
+ tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
|