最近我学习了一下flume-ng的相关知识,想做一个监控文件内容,然后每隔一个周期把文件内容经过处理放入mysql数据库的功能。我想自己写一个sink,有没有哪位高手可以详细介绍一下新建sink的详细过程。
我的过程是,复制apache-flume-1.5.0-src/flume-ng-sinks中的flume-ng-hbase-sink,重命名为flume-ng-mymessage-sink。然后进入此文件夹,删除src中的test文件夹,并修改文件夹中的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flume-ng-sinks</artifactId>
<groupId>org.apache.flume</groupId>
<version>1.5.0</version>
</parent>
<groupId>com.test.message</groupId>
<artifactId>flume-ng-mymessage-sink</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>
然后在src/main/java/com/test/message/文件夹中,先清空文件夹,再新建文件,写入如下内容
package com.test.message;
import java.util.*;
import java.util.logging.Logger;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class testmessage extends AbstractSink implements Configurable{
private Logger LOG = (Logger) LoggerFactory.getLogger(testmessage.class);
private String hostname;
private String port;
private String username;
private String password;
private int size;
public testmessage(){
LOG.info("Case_1 start...");
}
@Override
public void configure(Context context){
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set !!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set !!");
username = context.getString("username");
Preconditions.checkNotNull(username, "username must be set !!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set !!");
size = context.getInteger("size");
Preconditions.checkNotNull(size, "size must be set !!");
}
@Override
public void start(){
LOG.info("function start running");
LOG.info("hostname: " + hostname);
LOG.info("port: " + port);
LOG.info("username: " + username);
LOG.info("password: " + password);
}
@Override
public void stop(){
LOG.info("function stop running");
}
public int size(){
return size;
}
@Override
public Status process() throws EventDeliveryException{
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try{
for(int n=0; n < size; n++){
event = channel.take();
if(event != null){
content = new String(event.getBody());
LOG.info("content:"+content);
actions.add(content);
} else {
result = Status.BACKOFF;
break;
}
}
if(actions.size() > 0){
for(String temp : actions){
LOG.info(temp+" ");
}
}
} catch (Throwable e) {
((org.slf4j.Logger) LOG).error("Fail to show");
} finally {
transaction.close();
}
return result;
}
}
最后在flume-ng-mymessage-sink文件夹下使用mvn install -DskipTests,报错:
[ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.7:check (verify.rat) on project flume-ng-mymessage-sink: Too many unapproved licenses: 2 -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
求解???