我flume 的目的是:检测本地目录新文件(/home/see/seeupdate/blacklistOperRecord),将其搬到本地目录(/home/see/seeupdate/blacklistOperRecord/temp)
执行语句
/home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
报错(现在会在目标目录生成空文件 /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-3 )
FJXM-DM-TEST-NAG-01 flume/conf> Info: Sourcing environment configuration script /home/see/flume/flume/conf/flume-env.sh
+ exec /home/see/seeupdate/jdk1.6.0_16/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/see/flume/flume/conf:/home/see/flume/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -n mobileAgent -f /home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 03:56:40,560 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2015-06-20 03:56:40,567 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2015-06-20 03:56:40,571 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:56:40,572 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfsSinkContentDownload: type
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfsSinkContentDownload Agent: mobileAgent
2015-06-20 03:56:40,583 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: mobileAgent, initial-configuration: AgentConfiguration[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{type=file_roll, channel=memoryChannelContentDownload, sink.directory=/home/see/seeupdate/blacklistOperRecord/temp} }}
2015-06-20 03:56:40,587 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannelContentDownload
2015-06-20 03:56:40,594 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfsSinkContentDownload using FILE_ROLL
2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for mobileAgent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{type=file_roll, channel=memoryChannelContentDownload, sink.directory=/home/see/seeupdate/blacklistOperRecord/temp} }}
2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannelContentDownload
2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfsSinkContentDownload
2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources spooldirSourceContentDownload
2015-06-20 03:56:40,598 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [mobileAgent]
2015-06-20 03:56:40,598 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-06-20 03:56:40,606 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel memoryChannelContentDownload type memory
2015-06-20 03:56:40,610 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel memoryChannelContentDownload
2015-06-20 03:56:40,611 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source spooldirSourceContentDownload, type spooldir
2015-06-20 03:56:40,621 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 03:56:40,626 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel memoryChannelContentDownload connected to [spooldirSourceContentDownload, hdfsSinkContentDownload]
2015-06-20 03:56:40,633 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{spooldirSourceContentDownload=EventDrivenSourceRunner: { source:Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord } }} sinkRunners:{hdfsSinkContentDownload=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d58f9d3 counterGroup:{ name:null counters:{} } }} channels:{memoryChannelContentDownload=org.apache.flume.channel.MemoryChannel{name: memoryChannelContentDownload}} }
2015-06-20 03:56:40,644 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel memoryChannelContentDownload
2015-06-20 03:56:40,703 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memoryChannelContentDownload: Successfully registered new MBean.
2015-06-20 03:56:40,704 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memoryChannelContentDownload started
2015-06-20 03:56:40,704 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:160)] Waiting for channel: memoryChannelContentDownload to start. Sleeping for 500 ms
2015-06-20 03:56:41,204 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink hdfsSinkContentDownload
2015-06-20 03:56:41,206 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source spooldirSourceContentDownload
2015-06-20 03:56:41,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:104)] Starting org.apache.flume.sink.RollingFileSink{name:hdfsSinkContentDownload, channel:memoryChannelContentDownload}...
2015-06-20 03:56:41,207 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /home/see/seeupdate/blacklistOperRecord
2015-06-20 03:56:41,208 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: hdfsSinkContentDownload: Successfully registered new MBean.
2015-06-20 03:56:41,208 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: hdfsSinkContentDownload started
2015-06-20 03:56:41,211 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:136)] RollingFileSink hdfsSinkContentDownload started.
2015-06-20 03:56:41,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2015-06-20 03:56:41,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:56:41,227 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/home/see/seeupdate/blacklistOperRecord, metaDir=.flumespool, deserializer=LINE
2015-06-20 03:56:41,258 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /home/see/seeupdate/blacklistOperRecord/flume-spooldir-perm-check-7155634312227911932.canary
2015-06-20 03:56:41,259 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2015-06-20 03:56:41,260 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: spooldirSourceContentDownload: Successfully registered new MBean.
2015-06-20 03:56:41,260 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: spooldirSourceContentDownload started
2015-06-20 03:56:41,530 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
2015-06-20 03:57:11,207 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:57:11,210 (rollingFileSink-roller-13-0) [DEBUG - org.apache.flume.sink.RollingFileSink$1.run(RollingFileSink.java:127)] Marking time to rotate file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,238 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:142)] Time to rotate /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,238 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:145)] Closing file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,239 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:41,208 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:57:41,210 (rollingFileSink-roller-13-0) [DEBUG - org.apache.flume.sink.RollingFileSink$1.run(RollingFileSink.java:127)] Marking time to rotate file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,239 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:142)] Time to rotate /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,240 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:145)] Closing file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,240 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-3
配置文件
mobileAgent.sources = spooldirSourceContentDownload
mobileAgent.channels = memoryChannelContentDownload
mobileAgent.sinks = hdfsSinkContentDownload
############contentDownload####################################
mobileAgent.sources.spooldirSourceContentDownload.type = spooldir
mobileAgent.sources.spooldirSourceContentDownload.spoolDir =/home/see/seeupdate/blacklistOperRecord
mobileAgent.sources.spooldirSourceContentDownload.deletePolicy = immediate
mobileAgent.sources.spooldirSourceContentDownload.fileHeader = false
mobileAgent.sources.spooldirSourceContentDownload.maxBackoff = 60000
mobileAgent.sources.spooldirSourceContentDownload.basenameHeader = true
mobileAgent.sources.spooldirSourceContentDownload.basenameHeaderKey = basename
mobileAgent.channels.memoryChannelContentDownload.type = memory
mobileAgent.channels.memoryChannelContentDownload.keep-alive = 30
mobileAgent.channels.memoryChannelContentDownload.capacity = 2000
mobileAgent.channels.memoryChannelContentDownload.transactionCapacity =1000
mobileAgent.sinks.hdfsSinkContentDownload.type = file_roll
mobileAgent.sinks.hdfsSinkContentDownload.sink.directory =/home/see/seeupdate/blacklistOperRecord/temp
mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload
原目录文件
FJXM-DM-TEST-NAG-01 seeupdate/blacklistOperRecord> ll
total 36
-rw-r--r-- 1 see seegroup 10 2015-06-20 15:54 adj.txt
-rwx--x--x 1 see seegroup 27606 2015-06-19 19:30 s21211.txt
drwxr-xr-x 2 see seegroup 4096 2015-06-20 16:02 temp
|