package com.apache.hdfs;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class StormToHDFSTopology1 {
public static class hdfsSpout extends BaseRichSpout {
/** */
private static final long serialVersionUID = 1021968726187065581L;
private SpoutOutputCollector collector;
private Random ran=new Random();
private String[] records = {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" };
public void nextTuple() {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
Date date = new Date(System.currentTimeMillis());
String minute = df.format(date);
String recode = records[ran.nextInt(records.length)];
this.collector.emit(new Values(minute,recode));
Utils.sleep(1);
}
public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "recode"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" ");
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/user/hive/warehouse/").withPrefix("new").withExtension(".txt");
HdfsBolt hdfsBolt=new HdfsBolt()
.withFsUrl("hdfs://192.168.10.XXX:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new hdfsSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
Config conf = new Config();
String name = StormToHDFSTopology1.class.getSimpleName();
if(args.length==0){
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
}else{
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}
}
}