zstu 发表于 2017-4-12 17:26:03

Flume的Kafka Channel

我第一层用kafka收集数据,然后通过一台接口服务器flume收集第一层kafka的数据,并将数据传输到CDH集群的kafka中。我的flume的channel是用KafkaChannel,配置如下:tier1.sources=src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka

# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = event
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100

# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=event
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181
当我在第一层kafka发送数据的时候,在cdh集群的kafka接收的数据包含其他字符:
如 发送aaaaaa
ccccccccccccccccccccccccccccccccc

cdh的kafka接收:
topic
eventtimestamp1491974576638$aaaaaa

topic
eventtimestamp1491975454178Bccccccccccccccccccccccccccccccccc


想问一下,怎么只接收发送的数据,不要topic和eventtimestamp等字符。

2017 发表于 2017-4-12 18:28:23

tier1.sources=src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka

# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = test1,test2
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100

# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=channels
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181
楼主改成上面看看什么效果

zstu 发表于 2017-4-12 18:38:29

2017 发表于 2017-4-12 18:28
tier1.sources=src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka


改成那样CDH上的kafka就接收不到数据了

starrycheng 发表于 2017-4-12 19:34:54

zstu 发表于 2017-4-12 18:38
改成那样CDH上的kafka就接收不到数据了

数据由kafkasource进入flume,添加了header信息
这个只能修改源码,可参考下面内容
flume写kafka topic带有时间戳问题




页: [1]
查看完整版本: Flume的Kafka Channel