源测试文件的数据格式:
opcode1, col1:time2,24
opcode2, col2:time3,24
opcode1, col1:time1,25
opcode1, col1:time2,25
opcode2, col2:time3,25
其中:opcode1/opcode2对应hbase里的行键,col1/col2对应列族,time1/time2/time3对应列名,24/25对应具体的value。
flume的配置文件如下:
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type = exec
agent.sources.s1.command = tail -f /home/xltest/flumeForHbase.log
agent.sources.s1.channels = c1
agent.channels.c1.type = memory
agent.channels.c1.keep-alive = 10
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity =100000
agent.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.k1.channel = c1
agent.sinks.k1.table = flumetest
agent.sinks.k1.columnFamily = col1
agent.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
使用该配置文件,数据入Hbase之后,是这种格式:
hbase(main):050:0> scan 'flumetest'
ROW COLUMN+CELL
1862084728184-WyPJrukpPQ-0 column=col1:payload, timestamp=1373959739740, value=opcode2, col2:time3,22
1862084728203-WyPJrukpPQ-1 column=col1:payload, timestamp=1373959739740, value=opcode1,col1:time1,23
1862084728203-WyPJrukpPQ-2 column=col1:payload, timestamp=1373959739740, value=opcode1, col1:time2,23
1862084728203-WyPJrukpPQ-3 column=col1:payload, timestamp=1373959739740, value=opcode2, col2:time3,23
1862084728203-WyPJrukpPQ-4 column=col1:payload, timestamp=1373959739740, value=opcode1,col1:time1,24
1862084728204-WyPJrukpPQ-5 column=col1:payload, timestamp=1373959739740, value=opcode1, col1:time2,24
1862084728204-WyPJrukpPQ-6 column=col1:payload, timestamp=1373959739740, value=opcode2, col2:time3,24
1862084728204-WyPJrukpPQ-7 column=col1:payload, timestamp=1373959739740, value=opcode1,col1:time1,25
1862084728204-WyPJrukpPQ-8 column=col1:payload, timestamp=1373959739740, value=opcode1, col1:time2,25
期望的数据格式是:
hbase(main):003:0> scan 'flumetest' ,{COLUMNS =>'col1:time1'}
ROW COLUMN+CELL
opcode1 column=col1:time1, timestamp=1373959710160, value=100
1 row(s) in 0.0140 seconds
求助:
flume应该如何配置,才能得到期望的数据格式?
|