我是用的ambari安装的kafka
这是我的部分代码
BrokerHosts brokerHosts = new ZkHosts(zks);
GlobalPartitionInformation partitionInformation = new GlobalPartitionInformation(topic);
partitionInformation.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInformation);*/
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.ignoreZkOffsets = true;
spoutConf.zkServers = Arrays.asList(new String[] {"10.0.40.3"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),1);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 1).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 1).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 1).shuffleGrouping("to-upper");
|