分享

Flume-NG HDFSEventSink源码阅读

本帖最后由 howtodown 于 2014-5-17 17:10 编辑
问题导读:
1.configure(Context context)的作用是什么?
2.滚动文件的方式有几种?
3.文件类型fileType有几种?
4.如何获取sink的channel和transaction?



阅读代码之前:首先我们说一下Sink的概念
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。  有了上面基本认识之后,在进行下面。




HDFSEventSink是flume中一个很重要的sink,配置文件中type=hdfs。与此sink相关的类都在org.apache.flume.sink.hdfs包中。

HDFSEventSink算是一个比较复杂的sink,包下涉及的源代码文件数多达13个。。。可配置的参数众多。。。希望我能讲清楚。

一、首先依然是看configure(Context context)方法,用来获取配置文件中的配置信息,及初始化一些重要参数  
  1. public void configure(Context context) {  
  2.     this.context = context;
  3.     //HDFS目录路径,必需(eg hdfs://namenode/flume/webdata/)
  4.     filePath = Preconditions.checkNotNull(
  5.         context.getString("hdfs.path"), "hdfs.path is required");
  6.     //在Hdfs目录中生成的文件名字的前缀
  7.     fileName = context.getString("hdfs.filePrefix", defaultFileName);
  8.     //文件后缀,例如.avro,一般不用
  9.     this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
  10.     //内部写文件的时候表示正在写的文件的前缀和后缀
  11.     inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
  12.     inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);//默认是.tmp
  13.     String tzName = context.getString("hdfs.timeZone");
  14.     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
  15.     //当前写入的文件滚动间隔,默认30秒生成一个新的文件,0表示不基于时间间隔来滚动
  16.     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
  17.     //以文件大小触发文件滚动,单位字节,0表示不基于文件大小间隔来滚动
  18.     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
  19.     //以写入的事件数触发文件滚动, 0表示不基于事件数大小间隔来滚动
  20.     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
  21.     //事件刷新到HDFS之前的数量
  22.     batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
  23.     //超时后关闭无效文件(0 =禁止自动关闭闲置的文件)
  24.     idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
  25.     //压缩编码类型. one of following : gzip, bzip2, lzo, snappy
  26.     String codecName = context.getString("hdfs.codeC");
  27.     //文件格式:当前为SequenceFile, DataStream or CompressedStream。
  28.     //(1)DataStream不压缩输出文件,不能设置codeC选项,(2)CompressedStream需要设置hdfs.codeC的一个可用的编解码器
  29.     fileType = context.getString("hdfs.fileType", defaultFileType);
  30.     //允许打开的文件数。如果超过这个数字,最早的文件被关闭。
  31.     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
  32.     //HDFS的操作允许的毫秒数,如打开,写,刷新,关闭。这个数字应该增加,如果正在发生许多HDFS超时操作。
  33.     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
  34.     //每个HDFS sink用于HDFS io操作的线程数,如打开、写入等操作。
  35.     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
  36.         defaultThreadPoolSize);
  37.     //每个HDFS sink用于调度定时文件滚动的线程数
  38.     rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
  39.         defaultRollTimerPoolSize);
  40.     //安全认证时使用Kerberos user principal for accessing secure HDFS
  41.     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
  42.     //安全认证时使用Kerberos keytab for accessing secure HDFS
  43.     kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
  44.     proxyUserName = context.getString("hdfs.proxyUser", "");  //代理用户
  45.     Preconditions.checkArgument(batchSize > 0,
  46.         "batchSize must be greater than 0");
  47.     if (codecName == null) {  //不压缩数据
  48.       codeC = null;
  49.       compType = CompressionType.NONE;
  50.     } else {    //压缩数据
  51.       codeC = getCodec(codecName);
  52.       // TODO : set proper compression type
  53.       compType = CompressionType.BLOCK;
  54.     }
  55.     // Do not allow user to set fileType DataStream with codeC together
  56.     // To prevent output file with compress extension (like .snappy)
  57.     if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)//如果fileType是DataStream,则不允许压缩
  58.         && codecName != null) {
  59.       throw new IllegalArgumentException("fileType: " + fileType +
  60.           " which does NOT support compressed output. Please don't set codeC" +
  61.           " or change the fileType if compressed output is desired.");
  62.     }
  63.     if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {//如果fileType是压缩类型,则codeC不允许为空
  64.       Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
  65.           + " when fileType is: " + fileType);
  66.     }
  67.     if (!authenticate()) {  //认证
  68.       LOG.error("Failed to authenticate!");
  69.     }
  70.     //时间戳是否四舍五入(如果为true,会影响所有基于时间的转义序列%t除外)
  71.     needRounding = context.getBoolean("hdfs.round", false);
  72.     if(needRounding) {
  73.         //The unit of the round down value - second, minute or hour.
  74.       String unit = context.getString("hdfs.roundUnit", "second");  //滚动时间单位
  75.       if (unit.equalsIgnoreCase("hour")) {
  76.         this.roundUnit = Calendar.HOUR_OF_DAY;
  77.       } else if (unit.equalsIgnoreCase("minute")) {
  78.         this.roundUnit = Calendar.MINUTE;
  79.       } else if (unit.equalsIgnoreCase("second")){
  80.         this.roundUnit = Calendar.SECOND;
  81.       } else {
  82.         LOG.warn("Rounding unit is not valid, please set one of" +
  83.             "minute, hour, or second. Rounding will be disabled");
  84.         needRounding = false;
  85.       }
  86.       //Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
  87.       this.roundValue = context.getInteger("hdfs.roundValue", 1);  //滚动时间大小
  88.       if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){//检查是否符合分、秒数值,0<v<=60
  89.         Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
  90.             "Round value" +
  91.             "must be > 0 and <= 60");
  92.       } else if (roundUnit == Calendar.HOUR_OF_DAY){
  93.         Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,  //检查是否符合时数值0<v<=24
  94.             "Round value" +
  95.             "must be > 0 and <= 24");
  96.       }
  97.     }
  98.     if (sinkCounter == null) {//构造计数器
  99.       sinkCounter = new SinkCounter(getName());
  100.     }
  101.   }
复制代码

上面比较常用的参数有:rollInterval以固定时间间隔滚动文件,rollSize以文件大小为单位滚动文件,rollCount以行数来滚动文件,fileType(有3种SequenceFile(二进制)、DataStream(不能压缩)、CompressedStream(压缩文件)) 

二、接下来是start()方法。
  1. public void start() {
  2.     String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
  3.     callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
  4.             new ThreadFactoryBuilder().setNameFormat(timeoutName).build());  //这个线程池用来将event写入HDFS文件
  5.     String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
  6.     timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
  7.             new ThreadFactoryBuilder().setNameFormat(rollerName).build());  //这个线程池用来滚动文件
  8.     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);  //用来存储文件的绝对路径以及对应的BucketWriter
  9.     sinkCounter.start();
  10.     super.start();
  11.   }
复制代码

start方法主要是初始化两个线程池。  

三、process()方法,是用来处理channel中的event的,非线程安全的,要确保HDFS中的文件是打开的。
  1. public Status process() throws EventDeliveryException {
  2.     Channel channel = getChannel();    //获取对应的channel
  3.     Transaction transaction = channel.getTransaction();//获得事务
  4.     List<BucketWriter> writers = Lists.newArrayList(); //BucketWriter列表
  5.     transaction.begin();
  6.     try {
  7.       int txnEventCount = 0;
  8.       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//批量处理
  9.         Event event = channel.take();    //获取event
  10.         if (event == null) {
  11.           break;
  12.         }
  13.         // reconstruct the path name by substituting place holders
  14.         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
  15.             timeZone, needRounding, roundUnit, roundValue);    //格式化后的HDFS目录
  16.         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
  17.           timeZone, needRounding, roundUnit, roundValue);    //格式化后的文件名
  18.         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;    //要写入的文件的HDFS绝对路径
  19.         BucketWriter bucketWriter = sfWriters.get(lookupPath);    //获取文件的BucketWriter
  20.         // we haven't seen this file yet, so open it and cache the handle
  21.         if (bucketWriter == null) {    //如果没有这个文件
  22.             //根据fileType类型构造HDFSWriter(三种:SequenceFile、DataStream、CompressedStream)
  23.           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);   
  24.           WriterCallback idleCallback = null;
  25.           if(idleTimeout != 0) {
  26.             idleCallback = new WriterCallback() {
  27.               @Override
  28.               public void run(String bucketPath) {
  29.                 sfWriters.remove(bucketPath);
  30.               }
  31.             };
  32.           }
  33.           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
  34.               batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
  35.               suffix, codeC, compType, hdfsWriter, timedRollerPool,
  36.               proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
  37.           sfWriters.put(lookupPath, bucketWriter);    //将文件路径和BucketWriter组成K-V,放入sfWriters
  38.         }
  39.         // track the buckets getting written in this transaction
  40.         if (!writers.contains(bucketWriter)) {//如果BucketWriter列表没有正在写的文件——bucketWriter,则加入
  41.           writers.add(bucketWriter);
  42.         }
  43.         // Write the data to HDFS
  44.         append(bucketWriter, event);    //将event写入bucketWriter对应的文件中
  45.       }
  46.       if (txnEventCount == 0) {    //这次事务没有处理任何event
  47.         sinkCounter.incrementBatchEmptyCount();
  48.       } else if (txnEventCount == batchSize) {//一次处理batchSize个event
  49.         sinkCounter.incrementBatchCompleteCount();
  50.       } else {//channel中剩余的events不足batchSize
  51.         sinkCounter.incrementBatchUnderflowCount();
  52.       }
  53.       // flush all pending buckets before committing the transaction
  54.       for (BucketWriter bucketWriter : writers) {    //将所有BucketWriter数据刷新到HDFS中
  55.         flush(bucketWriter);
  56.       }
  57.       transaction.commit();    //提交事务
  58.       if (txnEventCount < 1) {
  59.         return Status.BACKOFF;
  60.       } else {
  61.         sinkCounter.addToEventDrainSuccessCount(txnEventCount);
  62.         return Status.READY;
  63.       }
  64.     } catch (IOException eIO) {
  65.       transaction.rollback();//异常后回滚
  66.       LOG.warn("HDFS IO error", eIO);
  67.       return Status.BACKOFF;
  68.     } catch (Throwable th) {
  69.       transaction.rollback();//异常后回滚
  70.       LOG.error("process failed", th);
  71.       if (th instanceof Error) {
  72.         throw (Error) th;
  73.       } else {
  74.         throw new EventDeliveryException(th);
  75.       }
  76.     } finally {
  77.       transaction.close();//关闭事务
  78.     }
  79.   }
复制代码

1、获取sink的channel和transaction,transaction.begin()是必要的步骤;

  2、循环处理批量的event,如果event==null,说明channel已无数据,则退出循环;

  3、realPath和realName都是格式化后的文件HDFS存储路径及文件名;lookupPath则是要写入的文件完整HDFS路径(目录+文件名);获取该文件对应的BucketWriter对象,要写入的文件及对应的BucketWriter对象需要存入sfWriters这个LinkedHashMap结构中,表示正在写的文件,BucketWriter类用来滚动文件、处理文件格式以及数据的序列化等操作,其实就是负责数据的写的;

  4、如果文件对应的bucketWriter不存在,则文件需要滚动,创建一个BucketWriter对象,只有public方法才是线程安全的。

  创建BucketWriter对象之前需要先构建一个HDFSWriter对象负责写文件,有三种类型:HDFSSequenceFile、HDFSDataStream、HDFSCompressedDataStream。

  WriterCallback idleCallback是用来超时后滚动文件的时候调用的,前提得是配置文件中有配置hdfs.idleTimeout且不为0;

  然后是new 一个BucketWriter对象,这有点复杂稍后说;

  sfWriters.put(lookupPath, bucketWriter)然后就是将文件及对应的bucketWriter对象存入sfWriters中,表示正在写的文件。

  5、这里要说下new BucketWriter对象的事。BucketWriter的构造函数首先是对众多参数赋值,然后isOpen = false,最后是this.writer.configure(context),即对writer对象进行配置。复杂就在这,这个writer对象是什么?它是上面4中所说的HDFSWriter。

  HDFSWriterFactory工厂类会根据配置文件中设置的类型返回相应的HDFSWriter对象,没有配置文件类型的话默认是HDFSSequenceFile。

  HDFSSequenceFile:configure(context)方法会首先获取写入格式writeFormat即参数"hdfs.writeFormat",默认格式是二进制的Writable(HDFSWritableSerializer.Builder.class),还有一个是Text(HDFSTextSerializer.Builder.class),第三个是null;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取writeFormat的所有配置信息serializerContext;然后根据writeFormat和serializerContext构造SequenceFileSerializer的对象serializer。在serializer中并无serializerContext配置的方法,在1.3.0中此处的serializerContext没有任何作用,可能是为以后做的预留。

  HDFSDataStream:configure(context)方法先获取serializerType类型,默认是TEXT(BodyTextEventSerializer.Builder.class),此外还有HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class)、OTHER(null)、AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class)共四种类型;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取serializer的所有配置信息serializerContext。serializer的实例化是在HDFSDataStream.open(String filePath)方法中实现的。此处的serializerContext在BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未用到,可能是做预留,但是FlumeEventAvroEventSerializer在其Builder中用到了,并进行了配置。

  HDFSCompressedDataStream:configure(context)方法和HDFSDataStream.configure(context)是一样的,serializerType的类型是一样的;其他也是一样。serializer的实例化是在HDFSCompressedDataStream.open(String filePath)方法中实现的,调用open(String filePath, CompressionCodec codec,CompressionType cType)来实例化。

  6、如果存储着正在写的bucketWriter的writers列表中没有此bucketWriter,则添加进去,writers的存在是为了统一flush方便,后面会有介绍。

  7、append(bucketWriter, event)这个是让bucketWriter处理event的方法,会使用bucketWriter.append(event)处理。这个方法的代码如下:

  1. public synchronized void append(Event event)
  2.           throws IOException, InterruptedException {
  3.     checkAndThrowInterruptedException();
  4.     if (!isOpen) {
  5.       if(idleClosed) {
  6.         throw new IOException("This bucket writer was closed due to idling and this handle " +
  7.             "is thus no longer valid");
  8.       }
  9.       open();//已经写完一个文件,新建新文件
  10.     }
  11.     // check if it's time to rotate the file
  12.     if (shouldRotate()) {//检查行数、大小是否改完成一个文件
  13.       close();
  14.       open();//新建新文件
  15.     }
  16.     // write the event
  17.     try {
  18.       sinkCounter.incrementEventDrainAttemptCount();
  19.       writer.append(event); // could block写数据
  20.     } catch (IOException e) {
  21.       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
  22.           bucketPath + ") and rethrowing exception.",
  23.           e.getMessage());
  24.       try {
  25.         close();
  26.       } catch (IOException e2) {
  27.         LOG.warn("Caught IOException while closing file (" +
  28.              bucketPath + "). Exception follows.", e2);
  29.       }
  30.       throw e;
  31.     }
  32.     // update statistics
  33.     processSize += event.getBody().length;
  34.     eventCounter++;
  35.     batchCounter++;
  36.     if (batchCounter == batchSize) {
  37.       flush();
  38.     }
  39.   }
复制代码

A、首先会检查当前线程是否中断checkAndThrowInterruptedException();

  B、BucketWriter初次运行时,isOpen=false(表示文件未打开不能写),idleClosed=false,会运行open()——doOpen()。fullFileName是"前缀.时间戳"组成的文件名,从这也可以看出时间戳部分不能更改,也就是HDFS中文件名无法自定义,除非自己定制HDFSSink;另外后缀名和压缩不能同时兼得,即如果没有配置压缩则可以在fullFileName后面添加自定义的后缀(比如后缀为.avro),否则只能添加压缩类型的后缀;bucketPath表示在HDFS中正在写的文件完整名字,这个名字中有标示正在写的文件的前后缀(inUsePrefix、inUseSuffix);targetPath这个是文件写完后的要更改成的完整名字,和bucketPath的区别是没有inUsePrefix、inUseSuffix;然后是根据有无压缩配置信息open此witer,没有压缩:writer.open(bucketPath),有压缩:writer.open(bucketPath, codeC, compType)。需要注意的是当使用Kerberos时,hadoop的RPC操作是非线程安全的包括getFileSystem()操作,open()操作在同一个JVM的同一时刻只能由一个线程使用,因为有可能导致死锁,见FLUME-1231。所以对open进行了同步。另外当在运行flume过程中出现类似异常“java.io.IOException: Callable timed out after 10000 ms on file”和"java.util.concurrent.TimeoutException"时,需要在这个方法上面config.set("dfs.socket.timeout", "3600000")增加超时时间,参考http://blog.csdn.net/yangbutao/article/details/8845025

  writer包含的三类均有两个open方法,一个是对应不压缩的open(String filePath) ,一个是对应压缩的open(String filePath, CompressionCodec codec,CompressionType cType)。

  首先writer若为HDFSSequenceFile,是支持压缩的,open(String filePath)会调用open(filePath, null, CompressionType.NONE)压缩方法,只不过没有压缩类型。压缩open方法先判断是否使用了本地文件系统,然后根据hadoop的配置信息是否支持追加"hdfs.append.support",构造相应的SequenceFile即writer。其中的serializer若为HDFSWritableSerializer则writer的Key为LongWritable类型,Value为BytesWritable二进制类型;若为HDFSTextSerializer,writer的Key为LongWritable类型,Value为Text文本类型。

  其次writer若为HDFSDataStream,是不支持压缩的。它的压缩方法open(String filePath, CompressionCodec codec,CompressionType cType)直接调用非压缩方法open(filePath)。open(filePath)判断是否使用了本地文件系统;然后根据是否支持append操作(获取配置的"hdfs.append.support"参数),构造对应的输出流outStream;然后构造serializer,有三种类型BodyTextEventSerializer、HeaderAndBodyTextEventSerializer、FlumeEventAvroEventSerializer,前两种支持追加,最后一种不支持追加,所以FlumeEventAvroEventSerializer不能将"hdfs.append.support"设置为true。如果支持追加就执行serializer.afterReopen()前两种serializer未实现这个方法(1.3.0),不支持就serializer.afterCreate()前两种也未实现这个方法,第三种则是dataFileWriter.create(getSchema(), getOutputStream())。

  最后writer若为HDFSCompressedDataStream,就是针对压缩的,其open(String filePath)会使用默认的DefaultCodec以及CompressionType.BLOCK来调用压缩open(String filePath, CompressionCodec codec,CompressionType cType)。压缩方法和HDFSDataStream的压缩方法类似,区别有两点一个是serializer的输出流变成压缩输出流了;另一个就是最后加了isFinished = false表示压缩流是否完毕。

  回到BucketWriter,如果rollInterval(按时间滚动文件)不为0,则创建一个Callable,放入timedRollFuture中rollInterval秒之后关闭文件,默认是30s写一个文件,这只是控制文件滚动的3个条件之一;

   isOpen = true表示文件已打开,可以write了。

  C、回到上面7中,shouldRotate()方法会判断文件中的行数和文件的大小是否达到配置文件中的配置,如果任何一个满足条件则可以关闭文件,这是控制文件滚动的3个条件中的两个。close()方法会关闭文件,再清理俩线程池及一些其他的清理工作,及改名(将.tmp文件改名),再open()就又到了上面B中所说的。

  D、writer.append(event)这是向HDFS中写数据的地方。这里又要分很多讨论了,因为writer有三类。

  writer为HDFSSequenceFile:append(event)方法,会先通过serializer.serialize(e)把event处理成一个Key和一个Value。

  (1)serializer为HDFSWritableSerializer时,则Key会是event.getHeaders().get("timestamp"),如果没有"timestamp"的Headers则使用当前系统时间System.currentTimeMillis(),然后将时间封装成LongWritable;Value是将event.getBody()封装成BytesWritable,代码是bytesObject.set(e.getBody(), 0, e.getBody().length);

  (2)serializer为HDFSTextSerializer时,Key和上述HDFSWritableSerializer一样;Value会将event.getBody()封装成Text,代码是textObject.set(e.getBody(), 0, e.getBody().length)。

  writer.append(event)中会将Key和Value,writer.append(record.getKey(), record.getValue())。

  writer为HDFSDataStream:append(event)方法直接调用serializer.write(e)。

  (1)serializer为BodyTextEventSerializer,则其write(e)方法会将e.getBody()写入输出流,并根据配置再写入一个"\n";

  (2)serializer为HeaderAndBodyTextEventSerializer,则其write(e)方法会将e.getHeaders() + " "(注意此空格)和e.getBody()写入输出流,并根据配置再写入一个"\n";

  (3)serializer为FlumeEventAvroEventSerializer,则其write(e)方法会将event整体写入dataFileWriter。

  writer为HDFSCompressedDataStream:append(event)方法会首先判断是否完成一个阶段的压缩isFinished,如果是则更新压缩输出流的状态,并isFinished=false,否则剩下的执行和HDFSDataStream.append(event)相同。

  E、是做一些统计工作processSize是统计文件大小的;eventCounter是统计文件行数的;batchCounter是统计最近一次flush之后的处理的event数;

  F、如果处理的event数达到batchSize则刷新到HDFS中,flush()。flush()方法会首先执行writer.sync()即写入HDFS,然后清空batchCounter表明这次batch已经完成,可以准备下次的。涉及到writer就会涉及很多写入类型:

  writer为HDFSSequenceFile:sync()方法执行SequenceFile.Writer.syncFs()将数据写入HDFS中;

  writer为HDFSDataStream:sync()方法执行

  writer为HDFSCompressedDataStream:sync()方法先执行serializer.flush():只有FlumeEventAvroEventSerializer的flush()方法也有实现dataFileWriter.flush(),其他俩BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未实现flush()方法。然后执行outStream.flush()和outStream.sync()将数据刷新至HDFS中。

  如果idleTimeout>0,表示文件超时时间,超时后就成为无效文件需要关闭(默认是0不允许关闭的),构造一个Callable对象idleAction执行内容是:close()方法,idleClosed = true表示超时关闭了这个bucketwriter,而且onIdleCallback.run(onIdleCallbackPath)会将onIdleCallbackPath从HDFSEventSink.sfWriters中删除对应对应的bucketwriter,表示这个文件已经写完了。然后将这个idleAction放入timedRollerPool中idleTimeout秒后执行。

  8、回到HDFSEventSink.process()方法中,会根据这次事务处理的event数量更新相应的计数器;

  9、遍历writers,挨个刷新BucketWriter至HDFS;

  10、transaction.commit();//提交事务

  11、transaction.rollback();//异常后回滚

  12、transaction.close();//关闭事务

  四、stop()方法。首先会遍历sfWriters,挨个close(BucketWriter):BucketWriter.close()方法,如果isOpen=true表示文件还处于打开状态,则writer.close()(这里的writer就不分情况了,HDFSSequenceFile就直接writer.close();其他俩都是先flush(好些都没实现)再beforClose(好些都没实现)输出流再flush、sync、close),BucketWriter.close()方法方法接下来关闭俩线程池以及改名等清理操作。HDFSEventSink的stop()方法接下来是关闭俩线程池,清理一些数据比如sfWriters.clear()。



ps:1、BucketWriter中的public方法都是线程安全的,包括append、close、flush三个均是同步方法,会调用相应的do方法,做具体的操作。

  2、callWithTimeout方法需要注意,在HDFSEventSink中多次用到这个方法:append、flush、close,这个方法会将对应的Callable放入callTimeoutPool线程池中执行,并等待callTimeout(默认是10000) ms返回结果。



问题1:WriterLinkedHashMap的sfWriters除了设置hdfs.idleTimeout且>0时才会从sfWriters中remove掉超时的bucketwriter,其它地方并没有发现remove操作,那么以后随着写入文件的增多sfWriters会不会始终增大?

解:肯定不会啊。别忘了还有一个"hdfs.maxOpenFiles"参数默认是5000,追踪发现HDFSEventSink内部静态类WriterLinkedHashMap继承了LinkedHashMap,并重写了removeEldestEntry方法,这个方法在sfWriters.put时总会调用,当sfWriters.size()>maxOpenFiles时就是自动清理之时了。maxOpenFiles就是sfWriters得最大容量。



这次的sink比较复杂,希望我写的大伙能够看懂,期间还有一些细节不太清楚,不过不影响整体的理解。

不解1:bucketwriter类中的doOpen方法中hadoop的RPC线程非安全,说是可以从FLUME-1231这得到解释

不解2:同样doOpen方法中有说“Need to get reference to FS using above config before underlying writer does in order to avoid shutdown hook & IllegalStateExceptions”这里也表示疑问,为什么这么说?

不解3:为什么HDFSWriter的3个实现类的open()方法中,均考虑了conf.getBoolean("hdfs.append.support", false) == true?一个是可追加的一个是不可追加的。但是都是一个SequenceFile.Writer或者FSDataOutputStream,尤其是在HDFSSequenceFile中的writer能不能追加似乎根本没什么区别,充其量是一个writer的参数是FSDataOutputStream,另外一个则不是,其他俩好歹还有需要设置appending=true用来判断是否可重复打开但也是有点牵强,都可以合二为一,但是为什么不那么做呢?

不解4:BucketPath.escapeString这个方法还没搞懂,导致格式化的结果不甚明了。。。哎

解4:escapeString这个会将目录和文件名前缀进行格式化,如果这两个变量中存在正则表达式,则event.headers中必须要有timestamp,也就是source的拦截器使用TimestampInterceptor或者自己写进headers中,会将正则部分格式化成时间对应的部分。realName是对配置文件中的前缀名格式化后的前缀名。会随着event的时间戳来进行变动,从而控制着新文件的滚动。另外由于三个控制文件滚动的参数使得文件滚动较快,写完一个文件后后续的还指向这个realName,则在bucketWriter.append中会根据文件是否写完从新创建一个前缀相同但是时间戳不同的文件;如果三个参数控制文件滚动较慢,realName变化比较快的话不能满足大小和行数就只能等超时的或者时间滚动这个参数设置了,最甚者文件永远不关闭等待满足条件,所有控制文件滚动的三个参数最好不要都为0,尤其是时间参数。这个HDFSEventSink还可以设置时区"hdfs.timeZone",会在格式化HDFS目录和前缀的时候用到,用的是java自带的java.util.TimeZone类,根据需要可以配置,这个功能应该是收集距离较远异地或者异国时用到。


来源:玖疯




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条