配置文件内容:
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
# 配置sources
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType=DataStream
flumetohdfs_agent.sinks.hdfs_sink.hdfs.writeFormat=Text
#Specify the channel the sink should use
flumetohdfs_agent.sinks.hdfs_sink.channel = mem_channel
# Each channel's type is defined.
flumetohdfs_agent.channels.mem_channel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
flumetohdfs_agent.channels.mem_channel.capacity = 100000
flumetohdfs_agent.channels.mem_channel.transactionCapacity = 10000
帮助看看有没有问题,感谢啦 |