分享

最简洁的kafka开发实例

hyj 2014-9-26 23:14:51 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 88104
问题导读
1.如何启动kafka?
2.如何通过代码实现生产者例子 ?
3.如何通过代码实现消费者例子






1.启动kafka。

//启动zookeeper server (用&是为了能退出命令行):
bin/zookeeper-server-start.sh config/zookeeper.properties  &
//启动kafka server:  
bin/kafka-server-start.sh config/server.properties  &

2.新建一个生产者例子

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaTest {
    public static void main(String[] args) {  
        Properties props = new Properties();  
        props.put("zk.connect", "10.103.22.47:2181");  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        props.put("metadata.broker.list", "10.103.22.47:9092");
        props.put("request.required.acks", "1");
        //props.put("partitioner.class", "com.xq.SimplePartitioner");
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  
        String ip = "192.168.2.3";
        String msg ="this is a messageuuu!";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg);  
        producer.send(data);
        producer.close();  
    }  

}

3.新建一个消费者例子

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;


public class ConsumerSample {

    public static void main(String[] args) {  
        // specify some consumer properties  
        Properties props = new Properties();  
        props.put("zookeeper.connect", "10.103.22.47:2181");  
        props.put("zookeeper.connectiontimeout.ms", "1000000");  
        props.put("group.id", "test_group");  

            // Create the connection to the cluster  
        ConsumerConfig consumerConfig = new ConsumerConfig(props);  
        ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig);  


        Map<String,Integer> topics = new HashMap<String,Integer>();  
        topics.put("test", 2);  
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics);  
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");
        ExecutorService threadPool = Executors.newFixedThreadPool(2);  
        for (final KafkaStream<byte[], byte[]> stream : streams) {  
            threadPool.submit(new Runnable() {  
                public void run() {  
                    for (MessageAndMetadata msgAndMetadata : stream) {  
                        // process message (msgAndMetadata.message())  
                        System.out.println("topic: " + msgAndMetadata.topic());  
                        Message message = (Message) msgAndMetadata.message();  
                        ByteBuffer buffer = message.payload();  
                        byte[] bytes = new byte[message.payloadSize()];  
                        buffer.get(bytes);  
                        String tmp = new String(bytes);  
                        System.out.println("message content: " + tmp);  
                    }  
                }  
            });  
        }   
    }
}


已有(6)人评论

跳转到指定楼层
desehawk 发表于 2015-3-12 10:21:34
string2020 发表于 2015-3-12 10:10
消息发布订阅系统 有哪些应用场景

Kafka的应用场景:
1 消息队列
比 起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统 一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。

2 行为跟踪
Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3 元信息监控
作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4 日志收集
日 志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的 系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5 流处理
这 个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行 阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返 还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。

6 事件源
事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7 持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

回复

使用道具 举报

desehawk 发表于 2015-3-12 10:28:32
string2020 发表于 2015-3-12 10:10
消息发布订阅系统 有哪些应用场景

在 大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合 大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka就出现了。Kafka可以起到两个作用:

降低系统组网复杂度。
降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用。

回复

使用道具 举报

hery 发表于 2015-12-22 16:13:53
代码中的  topics.put("test", 2);  这里的2是什么意思啊?
回复

使用道具 举报

niuniu9631 发表于 2015-12-22 21:49:19
学习了,十分感谢分享
回复

使用道具 举报

mseaspring 发表于 2016-3-1 18:15:16
为什么我这样做,消费不到数据,确认数据是存在的。
只能打印到;System.out.println("topic: " + msgAndMetadata.topic());  
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条