分享

flume 可以sink到本地文件目录么

aqi915 发表于 2015-6-20 12:29:35 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 18700
额,刚回复不成功么?

我flume 的目的是:检测本地目录新文件(/home/see/seeupdate/blacklistOperRecord),将其搬到本地目录(/home/see/seeupdate/blacklistOperRecord/temp)

执行语句
/home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
报错
FJXM-DM-TEST-NAG-01 flume/bin> /home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
[1] 126950
FJXM-DM-TEST-NAG-01 flume/bin> Info: Sourcing environment configuration script /home/see/flume/flume/conf/flume-env.sh
+ exec /home/see/seeupdate/jdk1.6.0_16/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/see/flume/flume/conf:/home/see/flume/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -n mobileAgent -f /home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,367 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2015-06-20 00:00:51,372 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2015-06-20 00:00:51,376 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 00:00:51,377 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,384 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfsSinkContentDownload: type
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfsSinkContentDownload Agent: mobileAgent
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: mobileAgent, initial-configuration: AgentConfiguration[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,390 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,397 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfsSinkContentDownload using FILE_ROLL
2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for mobileAgent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannelContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfsSinkContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources spooldirSourceContentDownload

2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [mobileAgent]
2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-06-20 00:00:51,410 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel memoryChannelContentDownload type memory
2015-06-20 00:00:51,415 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,416 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source spooldirSourceContentDownload, type spooldir
2015-06-20 00:00:51,427 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 00:00:51,431 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:432)] Sink hdfsSinkContentDownload has been removed due to an error during configuration
java.lang.IllegalArgumentException: Directory may not be null
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
        at org.apache.flume.sink.RollingFileSink.configure(RollingFileSink.java:84)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:00:51,433 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel memoryChannelContentDownload connected to [spooldirSourceContentDownload]
2015-06-20 00:00:51,441 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{spooldirSourceContentDownload=EventDrivenSourceRunner: { source:Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord } }} sinkRunners:{} channels:{memoryChannelContentDownload=org.apache.flume.channel.MemoryChannel{name: memoryChannelContentDownload}} }
2015-06-20 00:00:51,451 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel memoryChannelContentDownload
2015-06-20 00:00:51,496 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memoryChannelContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,497 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memoryChannelContentDownload started
2015-06-20 00:00:51,497 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source spooldirSourceContentDownload
2015-06-20 00:00:51,499 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /home/see/seeupdate/blacklistOperRecord
2015-06-20 00:00:51,507 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/home/see/seeupdate/blacklistOperRecord, metaDir=.flumespool, deserializer=LINE
2015-06-20 00:00:51,527 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /home/see/seeupdate/blacklistOperRecord/flume-spooldir-perm-check-713511486621893148.canary
2015-06-20 00:00:51,529 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: spooldirSourceContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: spooldirSourceContentDownload started
2015-06-20 00:00:51,807 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:01:21,499 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes


配置文件
FJXM-DM-TEST-NAG-01 flume/conf> more mobileAgent.conf
mobileAgent.sources = spooldirSourceContentDownload
mobileAgent.channels = memoryChannelContentDownload
mobileAgent.sinks = hdfsSinkContentDownload
############contentDownload####################################
mobileAgent.sources.spooldirSourceContentDownload.type = spooldir
mobileAgent.sources.spooldirSourceContentDownload.spoolDir =/home/see/seeupdate/blacklistOperRecord
mobileAgent.sources.spooldirSourceContentDownload.deletePolicy = immediate
mobileAgent.sources.spooldirSourceContentDownload.fileHeader = false
mobileAgent.sources.spooldirSourceContentDownload.maxBackoff = 60000
mobileAgent.sources.spooldirSourceContentDownload.basenameHeader = true
mobileAgent.sources.spooldirSourceContentDownload.basenameHeaderKey = basename
#mobileAgent.sources.spooldirSourceContentDownload.ignorePattern = ^(.)*\\.tmp$

mobileAgent.channels.memoryChannelContentDownload.type = memory
mobileAgent.channels.memoryChannelContentDownload.keep-alive = 30
mobileAgent.channels.memoryChannelContentDownload.capacity = 2000
mobileAgent.channels.memoryChannelContentDownload.transactionCapacity =1000

mobileAgent.sinks.hdfsSinkContentDownload.type = file_roll
mobileAgent.sinks.hdfsSinkContentDownload.directory =/home/see/seeupdate/blacklistOperRecord/temp
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.path = hdfs://10.46.192.173:1220/DataBase/Flume/blacklist/20%y%m%d/contentDownload
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.filePrefix = %{basename}
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileSuffix = .dat
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.writeFormat = Text
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileType = DataStream
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollInterval = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollSize = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollCount = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.batchSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.callTimeout = 30000
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.useLocalTimeStamp = true
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollTimerPoolSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.maxOpenFiles = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.idleTimeout= 60

mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload



已有(1)人评论

跳转到指定楼层
Alkaloid0515 发表于 2015-6-20 12:52:56
是这个帖子吗?flume 可以sink到本地文件目录么

下面红字是自定义???
mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条