这是错误:
18/08/16 23:44:05 INFO node.AbstractConfigurationProvider: Created channel c1
18/08/16 23:44:05 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
18/08/16 23:44:06 INFO api.MorphlineContext: Importing commands
18/08/16 23:44:07 ERROR node.PollingPropertiesFileConfigurationProvider: Unhandled error
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at sun.misc.IOUtils.readFully(IOUtils.java:60)
at java.util.jar.JarFile.getBytes(JarFile.java:390)
at java.util.jar.JarFile.getManifestFromReference(JarFile.java:180)
at java.util.jar.JarFile.getManifest(JarFile.java:167)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanJar(ClassPath.java:336)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanFrom(ClassPath.java:286)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scan(ClassPath.java:274)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath.from(ClassPath.java:82)
at org.kitesdk.morphline.api.MorphlineContext.getTopLevelClasses(MorphlineContext.java:149)
at org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:91)
at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:43)
at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.<init>(MorphlineInterceptor.java:135)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor.<init>(MorphlineInterceptor.java:55)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder.build(MorphlineInterceptor.java:112)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder.build(MorphlineInterceptor.java:103)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:80)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
这是我的fm:
agent1.sources = r1
agent1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = cat /home/training/Desktop/llo
agent1.sources.r1.channels = c1
agent1.sources.r1.interceptors=i1 i2
agent1.sources.r1.interceptors.i2.type=regex_filter
agent1.sources.r1.interceptors.i2.regex=(.*)installed(.*)
agent1.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent1.sources.r1.interceptors.i1.morphlineFile=/home/training/Desktop/morphline.conf
agent1.sources.r1.interceptors.i1.morphlineId=morphline
# kafka memeory
agent1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.c1.brokerList = localhost:9092
agent1.channels.c1.zookeeperConnect=localhost:2181
agent1.channels.c1.topic = format
agent1.channels.c1.kafka.consumer.group.id = format-consumer
agent1.channels.c1.capacity = 10000
agent1.channels.c1.transactionCapacity = 1000
agent1.channels.c1.parseAsFlumeEvent = true
fm2:
a1.sinks = k1
a1.channels = c1
# kafka memeory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.brokerList = localhost:9092
a1.channels.c1.zookeeperConnect=localhost:2181
a1.channels.c1.topic = format
a1.channels.c1.kafka.consumer.group.id = fm
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.parseAsFlumeEvent = true
# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /home/training/Desktop/ttt/q/ds=%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.fileSuffix = .avro
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.channel = c1
a1.sinks.k1.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
#a1.sinks.k1.serializer.schemaURL = hdfs://localhost:8020/user/schema/softschema.avsc
morphline.conf:
morphlines: [
{
id: morphline
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands: [
{
readLine {
charset: UTF-8
}
}
{
split {
inputField: message
outputFields: [date, time, soft, version]
separator: " "
isRegex: false
addEmptyStrings: false
trim: true
}
}
{
split {
inputField: soft
outputFields: [mes,plat]
separator: ":"
isRegex: false
addEmptyStrings: false
trim: true
}
}
{
split {
inputField: mes
outputFields: [state,status,name]
separator: ","
isRegex: false
addEmptyStrings: false
trim: true
}
}
{
addValues {
timestamp: "@{date} @{time}"
flume.avro.schema.url:"file:/home/training/Desktop/schemaa"
}
}
{
convertTimestamp {
field : timestamp
inputFormats : ["yyyy-MM-dd HH:mm:ss"]
outputFormat : unixTimeInMillis
}
}
{
logInfo {
format : "timestamp: {}, record: {}"
args : ["@{timestamp}", "@{}"]
}
}
{
toAvro {
schemaFile:/home/training/Desktop/schemaa
}
}
{
writeAvroToByteArray {
format : containerlessBinary
codec : snappy
}
}
]
}
]
|