kafka的序列化
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
配置中我有设置反序列化EVENT hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true
但是拿到的还是序列化的结果
求解?
agent的配置文件如下:
#自定义sources的名字
hdfsAgent.sources = hdfsKafkaSource
#自定义channels的名字
hdfsAgent.channels = testHdfsChannel
#自定义sinks的名字
hdfsAgent.sinks = hdfsSink
# 指定source使用的channel名字
hdfsAgent.sources.hdfsKafkaSource.channels = testHdfsChannel
# 指定sink需要使用的channel的名字,注意这里是channel
hdfsAgent.sinks.hdfsSink.channel = testHdfsChannel
#-------- hdfsKafkaSource相关配置-----------------
# 定义消息源类型
hdfsAgent.sources.hdfsKafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
#
# 这里特别注意: 是kafka的zookeeper的地址
#
#hdfsAgent.sources.hdfsKafkaSource.kafka.zookeeperConnect = nn1.hadoop:2181,nn2.hadoop:2181,s1.hadoop:2181
hdfsAgent.sources.hdfsKafkaSource.kafka.zookeeperConnect = m1.test:2181,m2.test:2181,s1.test:2181
#kafka 的brokers
hdfsAgent.sources.hdfsKafkaSource.kafka.bootstrap.servers = m1.test:9092,m2.test:9092,s1.test:9092
# 配置消费的kafka topic 多个逗号分隔
hdfsAgent.sources.hdfsKafkaSource.kafka.topics = test
#序列化
hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true
# 配置消费者组的id
hdfsAgent.sources.hdfsKafkaSource.kafka.consumer.group.id = test
hdfsAgent.sources.hdfsKafkaSource.kafka.batchSize = 1000
hdfsAgent.sources.hdfsKafkaSource.batchDurationMillis = 2000
#------- fileChannel相关配置-------------------------
# channel类型
hdfsAgent.channels.testHdfsChannel.type = memory
hdfsAgent.channels.testHdfsChannel.capacity = 10000
hdfsAgent.channels.testHdfsChannel.transactionCapacity = 1000
#---------Sink 相关配置------------------
hdfsAgent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
hdfsAgent.sinks.hdfsSink.hdfs.path = hdfs:///data/original/%{topic}/%Y%m%d
#设置hdfs文件的副本数量
hdfsAgent.sinks.hdfsSink.hdfs.minBlockReplicas = 1
# 当临时文件达到多少bytes生成新的文件,设置0标识不根据大小来分割文件,这是压缩前的文件大小
hdfsAgent.sinks.hdfsSink.hdfs.rollSize = 8435456
hdfsAgent.sinks.hdfsSink.hdfs.rollCount = 0
hdfsAgent.sinks.hdfsSink.hdfs.rollInterval = 0
hdfsAgent.sinks.hdfsSink.hdfs.roundValue = 5
hdfsAgent.sinks.hdfsSink.hdfs.roundUnit = minute
hdfsAgent.sinks.hdfsSink.hdfs.round = true
#配置前缀和后缀
hdfsAgent.sinks.hdfsSink.hdfs.filePrefix=%{topic}_%H%M%S_%{agentHost}
hdfsAgent.sinks.hdfsSink.hdfs.fileSuffix=.log
hdfsAgent.sinks.hdfsSink.hdfs.inUserSuffix =.tmp
hdfsAgent.sinks.hdfsSink.hdfs.idleTimeout = 120
hdfsAgent.sinks.hdfsSink.hdfs.writeFormat = Text
hdfsAgent.sinks.hdfsSink.hdfs.fileType = DataStream
hdfsAgent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
hdfsAgent.sinks.hdfsSink.hdfs.maxOpenFiles = 1000
hdfsAgent.sinks.hdfsSink.hdfs.batchSize= 100
hdfsAgent.sinks.hdfsSink.hdfs.connect-timeout=80000
hdfsAgent.sinks.hdfsSink.hdfs.callTimeout=120000
#自定义拦截器
hdfsAgent.sources.hdfsKafkaSource.interceptors = i1
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.type = host
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.useIP = false
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.hostHeader = agentHost
|
|