Flume的Kafka Channel
我第一层用kafka收集数据,然后通过一台接口服务器flume收集第一层kafka的数据,并将数据传输到CDH集群的kafka中。我的flume的channel是用KafkaChannel,配置如下:tier1.sources=src_kafkatier1.channels = ch_kafka
tier1.sinks = sink_kafka
# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = event
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100
# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=event
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181
当我在第一层kafka发送数据的时候,在cdh集群的kafka接收的数据包含其他字符:
如 发送aaaaaa
ccccccccccccccccccccccccccccccccc
cdh的kafka接收:
topic
eventtimestamp1491974576638$aaaaaa
topic
eventtimestamp1491975454178Bccccccccccccccccccccccccccccccccc
想问一下,怎么只接收发送的数据,不要topic和eventtimestamp等字符。
tier1.sources=src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka
# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = test1,test2
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100
# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=channels
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181
楼主改成上面看看什么效果
2017 发表于 2017-4-12 18:28
tier1.sources=src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka
改成那样CDH上的kafka就接收不到数据了
zstu 发表于 2017-4-12 18:38
改成那样CDH上的kafka就接收不到数据了
数据由kafkasource进入flume,添加了header信息
这个只能修改源码,可参考下面内容
flume写kafka topic带有时间戳问题
页:
[1]