本帖最后由 hyj 于 2014-9-26 23:08 编辑
问题导读
1.Kafka中Producer是什么?
2.如何实现Producer和Consumer应用程序?
编程 Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。
下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。 发送简单消息给Kafka broker,Producer端编写类ClusterProducer。
- public classClusterProducer extends Thread {
- private static final Log log =LogFactory.getLog(ClusterProducer.class);
-
- public void sendData() {
- Random rnd = new Random();
- Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
- if (props == null) {
- log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
- return;
- }
- //set the producer configurationproperties
- ProducerConfig config = newProducerConfig(props);
- Producer<String, String> producer= new Producer<String, String>(config);
-
- //Send the data
- int count = 1;
- KeyedMessage<String, String>data;
- while (count < 100) {
- String sign = "*";
- String ip = "192.168.2."+ rnd.nextInt(255);
- StringBuffer sb = newStringBuffer();
- for (int i = 0; i < count; i++){
- sb.append(sign);
- }
- log.info("set data:" +sb);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
- producer.send(data);
- count++;
- }
- producer.close();
- }
-
- public void run() {
- sendData();
- }
-
- public static void main(String[] args) {
- new ClusterProducer().sendData();
- }
- }
复制代码
定于Consumer获取端,获取对应topic的数据: - public class Consumerextends Thread {
- private static final Log log =LogFactory.getLog(Consumer.class);
- private final ConsumerConnector consumer;
- private final String topic;
-
- public Consumer(String topic) {
- consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
- createConsumerConfig());
- this.topic = topic;
- }
-
- private static ConsumerConfigcreateConsumerConfig() {
- Properties props = new Properties();
- props.put("zookeeper.connect", KafkaProperties.zkConnect);
- props.put("group.id",KafkaProperties.groupId);
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
-
- return new ConsumerConfig(props);
-
- }
-
- public void run() {
- Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, newInteger(1));
- Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
- ConsumerIterator<byte[], byte[]>it = stream.iterator();
- while (it.hasNext()) {
- log.info("+message: " +new String(it.next().message()));
- }
- }
-
- public static void main(String[] args) {
- Consumer client = new Consumer("cluster_statistics_topic");
- client.
复制代码
辅助类: - public interface PropertiesSettings {
-
- final static String CONSUMER_FILE_NAME = "consumer.properties";
- final static String PRODUCER_FILE_NAME = "producer.properties";
- final static String TOPIC_NAME = "cluster_statistics_topic";
- final static String TOPIC_A = "cluster_statistics_topic_A";
- }
复制代码
- package com.kafka.utils;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
-
- /**
- * @author JohnLiu
- * @version 0.1.0
- * @date 2014/8/27
- */
- public class PropertiesParser {
-
- private static final Log log = LogFactory.getLog(PropertiesParser.class);
- /* properties file type */
- Properties props = null;
-
- /* constructor method*/
- public PropertiesParser(Properties props) {
- this.props = props;
- }
-
- /**
- * Get the trimmed String value of the property with the given
- * <code>name</code>. If the value the empty String (after
- * trimming), then it returns null.
- */
- public String getStringProperty(String name) {
- return getStringProperty(name, null);
- }
-
- /**
- * Get the trimmed String value of the property with the given
- * <code>name</code> or the given default value if the value is
- * null or empty after trimming.
- */
- public String getStringProperty(String name, String def) {
- String val = props.getProperty(name, def);
- if (val == null) {
- return def;
- }
-
- val = val.trim();
-
- return (val.length() == 0) ? def : val;
- }
-
- private Properties loadPropertiesFile() {
- Properties props = new Properties();
- InputStream in;
- ClassLoader cl = getClass().getClassLoader();
- if (cl == null)
- cl = findClassloader();
- if (cl == null)
- try {
- throw new ProcessingException("Unable to find a class loader on the current thread or class.");
- } catch (ProcessingException e) {
- e.printStackTrace();
- }
- in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);
- try {
- props.load(in);
- } catch (IOException ioe) {
- log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe);
- }
- return props;
- }
-
- private ClassLoader findClassloader() {
- // work-around set context loader for windows-service started jvms (QUARTZ-748)
- if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) {
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- }
- return Thread.currentThread().getContextClassLoader();
- }
-
- public static Properties getProperties(final String fileName) {
- Properties props = new Properties();
- InputStream in = Thread.currentThread().getContextClassLoader()
- .getResourceAsStream(fileName);
- try {
- props.load(in);
- } catch (IOException ioe) {
- log.error("can't load " + fileName, ioe);
- }
- return props;
- }
- }
复制代码
配置参数文件consumer.properties: - zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181
- group.id=cluster_group
- zookeeper.session.timeout.ms=400
- zookeeper.sync.time.ms=200
- auto.commit.interval.ms=1000
复制代码
配置参数文件producer.properties: - metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092
- serializer.class=kafka.serializer.StringEncoder
- #partitioner.class=com.kafka.producer.SimplePartitioner
- request.required.acks=1
复制代码
分别执行上面的代码,可以发送或者得到对应topic信息。
kafka编程中, scala-compilerjar, scalalibraryjar在哪个文件夹下? 这个是scala的编译库喝运行时库(kafka是用sbt管理依赖的),所以建议你使用sbt,会自动下载所有依赖
plc 编程实例
|