立即注册 登录
About云-梭伦科技 返回首页

百事的个人空间 https://aboutyun.com/?47090 [收藏] [复制] [分享] [RSS]

日志

storm整合hdfs的一些问题,求各路大神帮忙看看

已有 1315 次阅读2016-8-19 14:00 |个人分类:storm

package com.apache.hdfs;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class StormToHDFSTopology1 {
public static class hdfsSpout extends BaseRichSpout {

/**  */
private static final long serialVersionUID = 1021968726187065581L;

private SpoutOutputCollector collector;
private Random ran=new Random();
private String[] records = {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" };

public void nextTuple() {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
Date date = new Date(System.currentTimeMillis());
String minute = df.format(date);
String recode = records[ran.nextInt(records.length)];
this.collector.emit(new Values(minute,recode));
Utils.sleep(1);
}

public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "recode"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter(" ");
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
       .withPath("/user/hive/warehouse/").withPrefix("new").withExtension(".txt");
HdfsBolt hdfsBolt=new HdfsBolt()
.withFsUrl("hdfs://192.168.10.XXX:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new hdfsSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
Config conf = new Config();
 
String name = StormToHDFSTopology1.class.getSimpleName();
if(args.length==0){
conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.shutdown();
}else{
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}
}
}


路过

雷人

握手

鲜花

鸡蛋

全部作者的其他最新日志

发表评论 评论 (1 个评论)

回复 百事 2016-8-19 14:02
以上为源码,我放到storm集群后没报错。没报错也没将那些数据存放到hdfs中。以下为storm启动
[root@/home/app/storm/bin/storm jar hdfs.jar com.hnjz.hdfs.StormToHDFSTopology1 arg
Running: /home/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/app/storm -Dstorm.log.dir=/home/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/app/storm/hdfs.jar:/home/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/app/storm/lib/log4j-api-2.1.jar:/home/app/storm/lib/clojure-1.7.0.jar:/home/app/storm/lib/disruptor-3.3.2.jar:/home/app/storm/lib/kryo-3.0.3.jar:/home/app/storm/lib/slf4j-api-1.7.7.jar:/home/app/storm/lib/storm-hdfs-1.0.1.jar:/home/app/storm/lib/minlog-1.3.0.jar:/home/app/storm/lib/servlet-api-2.5.jar:/home/app/storm/lib/storm-rename-hack-1.0.1.jar:/home/app/storm/lib/asm-5.0.3.jar:/home/app/storm/lib/objenesis-2.1.jar:/home/app/storm/lib/hadoop-common-2.2.0.jar:/home/app/storm/lib/log4j-core-2.1.jar:/home/app/storm/lib/hadoop-hdfs-2.2.0.jar:/home/app/storm/lib/reflectasm-1.10.1.jar:/home/app/storm/lib/storm-core-1.0.1.jar:/home/app/storm/lib/log4j-over-slf4j-1.6.6.jar:hdfs.jar:/home/app/storm/conf:/home/app/storm/bin -Dstorm.jar=hdfs.jar com.hnjz.hdfs.StormToHDFSTopology1 arg
741  [main] INFO  o.a.s.u.TupleUtils - Enabling tick tuple with interval [15]
896  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -4740143384027746980:-6280803895958214705
1002 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
1117 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar hdfs.jar to assigned location: /home/app/storm/storm-local/nimbus/inbox/stormjar-3ccf15b5-8419-4c1f-ae6a-20172203a381.jar
Start uploading file 'hdfs.jar' to '/home/app/storm/storm-local/nimbus/inbox/stormjar-3ccf15b5-8419-4c1f-ae6a-20172203a381.jar' (75985 bytes)
[==================================================] 75985 / 75985
File 'hdfs.jar' uploaded to '/home/app/storm/storm-local/nimbus/inbox/stormjar-3ccf15b5-8419-4c1f-ae6a-20172203a381.jar' (75985 bytes)
1133 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/app/storm/storm-local/nimbus/inbox/stormjar-3ccf15b5-8419-4c1f-ae6a-20172203a381.jar
1133 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormToHDFSTopology1 in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-4740143384027746980:-6280803895958214705"}
1247 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: StormToHDFSTopology1

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条