立即注册 登录
About云-梭伦科技 返回首页

JustEnding的个人空间 https://aboutyun.com/?14809 [收藏] [复制] [分享] [RSS]

日志

有关flume-ng自定义sink的创建-打包-引用

热度 2已有 10435 次阅读2015-1-6 13:58 |个人分类:Flume

最近我学习了一下flume-ng的相关知识,想做一个监控文件内容,然后每隔一个周期把文件内容经过处理放入mysql数据库的功能。我想自己写一个sink,有没有哪位高手可以详细介绍一下新建sink的详细过程。
我的过程是,复制apache-flume-1.5.0-src/flume-ng-sinks中的flume-ng-hbase-sink,重命名为flume-ng-mymessage-sink。然后进入此文件夹,删除src中的test文件夹,并修改文件夹中的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>flume-ng-sinks</artifactId>
    <groupId>org.apache.flume</groupId>
    <version>1.5.0</version>
  </parent>
  <groupId>com.test.message</groupId>
  <artifactId>flume-ng-mymessage-sink</artifactId>
  <version>1.0</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.rat</groupId>
        <artifactId>apache-rat-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    </plugins>
  </build>
  <dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-sdk</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-configuration</artifactId>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
  </dependencies>
</project>

然后在src/main/java/com/test/message/文件夹中,先清空文件夹,再新建文件,写入如下内容
package com.test.message;

import java.util.*;
import java.util.logging.Logger;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

public class testmessage extends AbstractSink implements Configurable{
    private Logger LOG = (Logger) LoggerFactory.getLogger(testmessage.class);
    private String hostname;
    private String port;
    private String username;
    private String password;
    private int size;

    public testmessage(){
        LOG.info("Case_1 start...");
    }

    @Override
    public void configure(Context context){
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set !!");

        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set !!");

        username = context.getString("username");
        Preconditions.checkNotNull(username, "username must be set !!");

        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set !!");

        size = context.getInteger("size");
        Preconditions.checkNotNull(size, "size must be set !!");
    }

    @Override
    public void start(){
        LOG.info("function start running");
        LOG.info("hostname: " + hostname);
        LOG.info("port: " + port);
        LOG.info("username: " + username);
        LOG.info("password: " + password);
    }

    @Override
    public void stop(){
        LOG.info("function stop running");
    }

    public int size(){
        return size;
    }

    @Override
    public Status process() throws EventDeliveryException{
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;

        List<String> actions = Lists.newArrayList();
        transaction.begin();
        try{
            for(int n=0; n < size; n++){
                event = channel.take();
                if(event != null){
                    content = new String(event.getBody());
                    LOG.info("content:"+content);
                    actions.add(content);
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }
            if(actions.size() > 0){
                for(String temp : actions){
                    LOG.info(temp+" ");
                }
            }
        } catch (Throwable e) {
            ((org.slf4j.Logger) LOG).error("Fail to show");
        } finally {
            transaction.close();
        }
        return result;
    }
}

最后在flume-ng-mymessage-sink文件夹下使用mvn install -DskipTests,报错:
[ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.7:check (verify.rat) on project flume-ng-mymessage-sink: Too many unapproved licenses: 2 -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
求解???

路过

雷人

握手

鲜花

鸡蛋

发表评论 评论 (12 个评论)

回复 nextuser 2015-1-6 18:27
删除flume-ng-core的\src\test\resources\event.txt
回复 JustEnding 2015-1-6 19:48
nextuser: 删除flume-ng-core的\src\test\resources\event.txt
我的flume-ng版本是1.5.0的,源码文件夹中你提到的那个目录并没有event.txt,怎么办呢,里面只有jettykeystore;server.p12;test_command.ps1;TestResettableFileInputStream_1.avro;truststore.jks
log4j.properties;syslog_event.avsc;test_command.txt;TestResettableFileInputStream_1.truncated.avro
回复 nextuser 2015-1-7 15:36
JustEnding: 我的flume-ng版本是1.5.0的,源码文件夹中你提到的那个目录并没有event.txt,怎么办呢,里面只有jettykeystore;server.p12;test_command.ps1;TestResettableF ...
可以坐下备份,把一些没用的删除试试
回复 JustEnding 2015-1-7 17:03
nextuser: 可以坐下备份,把一些没用的删除试试
谢谢,我今天下午调试出来了,是java文件写错了,util包的log和slf4j的log包混用没看清楚。很感谢你的帮忙。
回复 nextuser 2015-1-7 22:36
   解决就好
回复 JustEnding 2015-1-8 17:38
nextuser:    解决就好
回复 vanking 2015-1-15 09:19
楼主是怎么解决的,我也遇到了这样的问题
回复 JustEnding 2015-1-15 09:39
vanking: 楼主是怎么解决的,我也遇到了这样的问题
我的问题是java类里面的log用错了,util包里面的log和slf4j包里面的log混用了。你可以检查一下,我觉得像这样的错误,很多是因为java类里面有错误
回复 vanking 2015-1-15 11:30
楼主再问个问题,你是怎样把自定义的sink集合到flume中的呢?就是你自定义sink,如何在配置文件中调用?麻烦说详细点,谢谢哈
回复 JustEnding 2015-1-15 12:06
vanking: 楼主再问个问题,你是怎样把自定义的sink集合到flume中的呢?就是你自定义sink,如何在配置文件中调用?麻烦说详细点,谢谢哈
通过maven创建工程→写入自定义sink类→打包→从工程文件夹下的target文件夹下拿出jar文件移动到flume文件夹下的plugins.d/custom/lib/目录下(没有就自己新建,custom文件夹可以自己命名,详见官网)→在配置文件的sink.type中写入自定义类的路径,格式为(包名.类名)。大致是这样
回复 vanking 2015-1-15 14:26
可以了,谢谢楼主!楼主可以加个QQ不?可以一起讨论
回复 vanking 2015-1-22 16:43
不知道你现在有空没,打扰了! 我现在要监控一个目录中的日志文件,但是这些日志文件都不断的被追加,也就是说这些日志文件有一些是增量文件,增至一定量后日志文件就会滚动生成一个新的文件,对于这养的日志监控,spool肯定是不行的,exec的话有可能有数据丢失,所以有什么更好的办法没?

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条