Flume-NG源码阅读之HDFSEventSink

常睿范
2023-12-01

HDFSEventSink是flume中一个很重要的sink,配置文件中type=hdfs。与此sink相关的类都在org.apache.flume.sink.hdfs包中。

HDFSEventSink算是一个比较复杂的sink,包下涉及的源代码文件数多达13个。。。可配置的参数众多。。。希望我能讲清楚。

一、首先依然是看configure(Context context)方法,用来获取配置文件中的配置信息,及初始化一些重要参数   

 1 public void configure(Context context) {  
 2     this.context = context;
  3     //HDFS目录路径,必需(eg hdfs://namenode/flume/webdata/)
  4     filePath = Preconditions.checkNotNull(
  5         context.getString("hdfs.path"), "hdfs.path is required");
  6     //在Hdfs目录中生成的文件名字的前缀
  7     fileName = context.getString("hdfs.filePrefix", defaultFileName);
  8     //文件后缀,例如.avro,一般不用
  9     this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
 10     //内部写文件的时候表示正在写的文件的前缀和后缀
 11     inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
 12     inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);//默认是.tmp
 13     String tzName = context.getString("hdfs.timeZone");
 14     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
 15     //当前写入的文件滚动间隔,默认30秒生成一个新的文件,0表示不基于时间间隔来滚动
 16     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
 17     //以文件大小触发文件滚动,单位字节,0表示不基于文件大小间隔来滚动
 18     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
 19     //以写入的事件数触发文件滚动, 0表示不基于事件数大小间隔来滚动
 20     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
 21     //事件刷新到HDFS之前的数量
 22     batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
 23     //超时后关闭无效文件(0 =禁止自动关闭闲置的文件)
 24     idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
 25     //压缩编码类型. one of following : gzip, bzip2, lzo, snappy
 26     String codecName = context.getString("hdfs.codeC");
 27     //文件格式:当前为SequenceFile, DataStream or CompressedStream。
 28     //(1)DataStream不压缩输出文件,不能设置codeC选项,(2)CompressedStream需要设置hdfs.codeC的一个可用的编解码器
 29     fileType = context.getString("hdfs.fileType", defaultFileType);
 30     //允许打开的文件数。如果超过这个数字,最早的文件被关闭。
 31     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
 32     //HDFS的操作允许的毫秒数,如打开,写,刷新,关闭。这个数字应该增加,如果正在发生许多HDFS超时操作。
 33     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
 34     //每个HDFS sink用于HDFS io操作的线程数,如打开、写入等操作。
 35     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
 36         defaultThreadPoolSize);
 37     //每个HDFS sink用于调度定时文件滚动的线程数
 38     rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
 39         defaultRollTimerPoolSize);
 40     //安全认证时使用Kerberos user principal for accessing secure HDFS
 41     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
 42     //安全认证时使用Kerberos keytab for accessing secure HDFS
 43     kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
 44     proxyUserName = context.getString("hdfs.proxyUser", "");  //代理用户
 45 
 46     Preconditions.checkArgument(batchSize > 0,
 47         "batchSize must be greater than 0");
 48     if (codecName == null) {  //不压缩数据
 49       codeC = null;
 50       compType = CompressionType.NONE;
 51     } else {    //压缩数据
 52       codeC = getCodec(codecName);
 53       // TODO : set proper compression type
 54       compType = CompressionType.BLOCK;
 55     }
 56 
 57     // Do not allow user to set fileType DataStream with codeC together
 58     // To prevent output file with compress extension (like .snappy)
 59     if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)//如果fileType是DataStream,则不允许压缩
 60         && codecName != null) {
 61       throw new IllegalArgumentException("fileType: " + fileType +
 62           " which does NOT support compressed output. Please don't set codeC" +
 63           " or change the fileType if compressed output is desired.");
 64     }
 65 
 66     if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {//如果fileType是压缩类型,则codeC不允许为空
 67       Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
 68           + " when fileType is: " + fileType);
 69     }
 70 
 71     if (!authenticate()) {  //认证
 72       LOG.error("Failed to authenticate!");
 73     }
 74     //时间戳是否四舍五入(如果为true,会影响所有基于时间的转义序列%t除外)
 75     needRounding = context.getBoolean("hdfs.round", false);
 76 
 77     if(needRounding) {
 78         //The unit of the round down value - second, minute or hour.
 79       String unit = context.getString("hdfs.roundUnit", "second");  //滚动时间单位
 80       if (unit.equalsIgnoreCase("hour")) {
 81         this.roundUnit = Calendar.HOUR_OF_DAY;
 82       } else if (unit.equalsIgnoreCase("minute")) {
 83         this.roundUnit = Calendar.MINUTE;
 84       } else if (unit.equalsIgnoreCase("second")){
 85         this.roundUnit = Calendar.SECOND;
 86       } else {
 87         LOG.warn("Rounding unit is not valid, please set one of" +
 88             "minute, hour, or second. Rounding will be disabled");
 89         needRounding = false;
 90       }
 91       //Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
 92       this.roundValue = context.getInteger("hdfs.roundValue", 1);  //滚动时间大小
 93       if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){//检查是否符合分、秒数值,0<v<=60
 94         Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
 95             "Round value" +
 96             "must be > 0 and <= 60");
 97       } else if (roundUnit == Calendar.HOUR_OF_DAY){
 98         Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,  //检查是否符合时数值0<v<=24
 99             "Round value" +
100             "must be > 0 and <= 24");
101       }
102     }
103 
104     if (sinkCounter == null) {//构造计数器
105       sinkCounter = new SinkCounter(getName());
106     }
107   }

上面比较常用的参数有:rollInterval以固定时间间隔滚动文件,rollSize以文件大小为单位滚动文件,rollCount以行数来滚动文件,fileType(有3种SequenceFile(二进制)、DataStream(不能压缩)、CompressedStream(压缩文件))

二、接下来是start()方法。

 1 public void start() {
 2     String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
 3     callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
 4             new ThreadFactoryBuilder().setNameFormat(timeoutName).build());  //这个线程池用来将event写入HDFS文件
 5 
 6     String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
 7     timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
 8             new ThreadFactoryBuilder().setNameFormat(rollerName).build());  //这个线程池用来滚动文件
 9 
10     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);  //用来存储文件的绝对路径以及对应的BucketWriter
11     sinkCounter.start();
12     super.start();
13   }

start方法主要是初始化两个线程池。

三、process()方法,是用来处理channel中的event的,非线程安全的,要确保HDFS中的文件是打开的。

 1 public Status process() throws EventDeliveryException {
 2     Channel channel = getChannel();    //获取对应的channel
 3     Transaction transaction = channel.getTransaction();//获得事务
 4     List<BucketWriter> writers = Lists.newArrayList(); //BucketWriter列表
 5     transaction.begin();
 6     try {
 7       int txnEventCount = 0;
 8       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//批量处理
 9         Event event = channel.take();    //获取event
10         if (event == null) {
11           break;
12         }
13 
14         // reconstruct the path name by substituting place holders
15         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
16             timeZone, needRounding, roundUnit, roundValue);    //格式化后的HDFS目录
17         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
18           timeZone, needRounding, roundUnit, roundValue);    //格式化后的文件名
19 
20         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;    //要写入的文件的HDFS绝对路径
21         BucketWriter bucketWriter = sfWriters.get(lookupPath);    //获取文件的BucketWriter
22 
23         // we haven't seen this file yet, so open it and cache the handle
24         if (bucketWriter == null) {    //如果没有这个文件
25             //根据fileType类型构造HDFSWriter(三种:SequenceFile、DataStream、CompressedStream)
26           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);    
27 
28           WriterCallback idleCallback = null;
29           if(idleTimeout != 0) {
30             idleCallback = new WriterCallback() {
31               @Override
32               public void run(String bucketPath) {
33                 sfWriters.remove(bucketPath);
34               }
35             };
36           }
37           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
38               batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
39               suffix, codeC, compType, hdfsWriter, timedRollerPool,
40               proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
41 
42           sfWriters.put(lookupPath, bucketWriter);    //将文件路径和BucketWriter组成K-V,放入sfWriters
43         }
44 
45         // track the buckets getting written in this transaction
46         if (!writers.contains(bucketWriter)) {//如果BucketWriter列表没有正在写的文件——bucketWriter,则加入
47           writers.add(bucketWriter);
48         }
49 
50         // Write the data to HDFS
51         append(bucketWriter, event);    //将event写入bucketWriter对应的文件中
52       }
53 
54       if (txnEventCount == 0) {    //这次事务没有处理任何event
55         sinkCounter.incrementBatchEmptyCount();
56       } else if (txnEventCount == batchSize) {//一次处理batchSize个event
57         sinkCounter.incrementBatchCompleteCount();
58       } else {//channel中剩余的events不足batchSize
59         sinkCounter.incrementBatchUnderflowCount();
60       }
61 
62       // flush all pending buckets before committing the transaction
63       for (BucketWriter bucketWriter : writers) {    //将所有BucketWriter数据刷新到HDFS中
64         flush(bucketWriter);
65       }
66 
67       transaction.commit();    //提交事务
68 
69       if (txnEventCount < 1) {
70         return Status.BACKOFF;
71       } else {
72         sinkCounter.addToEventDrainSuccessCount(txnEventCount);
73         return Status.READY;
74       }
75     } catch (IOException eIO) {
76       transaction.rollback();//异常后回滚
77       LOG.warn("HDFS IO error", eIO);
78       return Status.BACKOFF;
79     } catch (Throwable th) {
80       transaction.rollback();//异常后回滚
81       LOG.error("process failed", th);
82       if (th instanceof Error) {
83         throw (Error) th;
84       } else {
85         throw new EventDeliveryException(th);
86       }
87     } finally {
88       transaction.close();//关闭事务
89     }
90   }

1、获取sink的channel和transaction,transaction.begin()是必要的步骤;

2、循环处理批量的event,如果event==null,说明channel已无数据,则退出循环;

3、realPath和realName都是格式化后的文件HDFS存储路径及文件名;lookupPath则是要写入的文件完整HDFS路径(目录+文件名);获取该文件对应的BucketWriter对象,要写入的文件及对应的BucketWriter对象需要存入sfWriters这个LinkedHashMap结构中,表示正在写的文件,BucketWriter类用来滚动文件、处理文件格式以及数据的序列化等操作,其实就是负责数据的写的;

4、如果文件对应的bucketWriter不存在,则文件需要滚动,创建一个BucketWriter对象,只有public方法才是线程安全的。

创建BucketWriter对象之前需要先构建一个HDFSWriter对象负责写文件,有三种类型:HDFSSequenceFile、HDFSDataStream、HDFSCompressedDataStream。

WriterCallback idleCallback是用来超时后滚动文件的时候调用的,前提得是配置文件中有配置hdfs.idleTimeout且不为0;

然后是new 一个BucketWriter对象,这有点复杂稍后说;

sfWriters.put(lookupPath, bucketWriter)然后就是将文件及对应的bucketWriter对象存入sfWriters中,表示正在写的文件。

5、这里要说下new BucketWriter对象的事。BucketWriter的构造函数首先是对众多参数赋值,然后isOpen = false,最后是this.writer.configure(context),即对writer对象进行配置。复杂就在这,这个writer对象是什么?它是上面4中所说的HDFSWriter。

HDFSWriterFactory工厂类会根据配置文件中设置的类型返回相应的HDFSWriter对象,没有配置文件类型的话默认是HDFSSequenceFile。

HDFSSequenceFile:configure(context)方法会首先获取写入格式writeFormat即参数"hdfs.writeFormat",默认格式是二进制的Writable(HDFSWritableSerializer.Builder.class),还有一个是Text(HDFSTextSerializer.Builder.class),第三个是null;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取writeFormat的所有配置信息serializerContext;然后根据writeFormat和serializerContext构造SequenceFileSerializer的对象serializer。在serializer中并无serializerContext配置的方法,在1.3.0中此处的serializerContext没有任何作用,可能是为以后做的预留。

HDFSDataStream:configure(context)方法先获取serializerType类型,默认是TEXT(BodyTextEventSerializer.Builder.class),此外还有HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class)、OTHER(null)、AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class)共四种类型;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取serializer的所有配置信息serializerContext。serializer的实例化是在HDFSDataStream.open(String filePath)方法中实现的。此处的serializerContext在BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未用到,可能是做预留,但是FlumeEventAvroEventSerializer在其Builder中用到了,并进行了配置。

HDFSCompressedDataStream:configure(context)方法和HDFSDataStream.configure(context)是一样的,serializerType的类型是一样的;其他也是一样。serializer的实例化是在HDFSCompressedDataStream.open(String filePath)方法中实现的,调用open(String filePath, CompressionCodec codec,CompressionType cType)来实例化。

6、如果存储着正在写的bucketWriter的writers列表中没有此bucketWriter,则添加进去,writers的存在是为了统一flush方便,后面会有介绍。

7、append(bucketWriter, event)这个是让bucketWriter处理event的方法,会使用bucketWriter.append(event)处理。这个方法的代码如下:

 1 public synchronized void append(Event event)
 2           throws IOException, InterruptedException {
 3     checkAndThrowInterruptedException();
 4     if (!isOpen) {
 5       if(idleClosed) {
 6         throw new IOException("This bucket writer was closed due to idling and this handle " +
 7             "is thus no longer valid");
 8       }
 9       open();//已经写完一个文件,新建新文件
10     }
11 
12     // check if it's time to rotate the file
13     if (shouldRotate()) {//检查行数、大小是否改完成一个文件
14       close();
15       open();//新建新文件
16     }
17 
18     // write the event
19     try {
20       sinkCounter.incrementEventDrainAttemptCount();
21       writer.append(event); // could block写数据
22     } catch (IOException e) {
23       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
24           bucketPath + ") and rethrowing exception.",
25           e.getMessage());
26       try {
27         close();
28       } catch (IOException e2) {
29         LOG.warn("Caught IOException while closing file (" +
30              bucketPath + "). Exception follows.", e2);
31       }
32       throw e;
33     }
34 
35     // update statistics
36     processSize += event.getBody().length;
37     eventCounter++;
38     batchCounter++;
39 
40     if (batchCounter == batchSize) {
41       flush();
42     }
43   }

A、首先会检查当前线程是否中断checkAndThrowInterruptedException();

B、BucketWriter初次运行时,isOpen=false(表示文件未打开不能写),idleClosed=false,会运行open()——doOpen()。fullFileName是"前缀.时间戳"组成的文件名,从这也可以看出时间戳部分不能更改,也就是HDFS中文件名无法自定义,除非自己定制HDFSSink;另外后缀名和压缩不能同时兼得,即如果没有配置压缩则可以在fullFileName后面添加自定义的后缀(比如后缀为.avro),否则只能添加压缩类型的后缀;bucketPath表示在HDFS中正在写的文件完整名字,这个名字中有标示正在写的文件的前后缀(inUsePrefix、inUseSuffix);targetPath这个是文件写完后的要更改成的完整名字,和bucketPath的区别是没有inUsePrefix、inUseSuffix;然后是根据有无压缩配置信息open此witer,没有压缩:writer.open(bucketPath),有压缩:writer.open(bucketPath, codeC, compType)。需要注意的是当使用Kerberos时,hadoop的RPC操作是非线程安全的包括getFileSystem()操作,open()操作在同一个JVM的同一时刻只能由一个线程使用,因为有可能导致死锁,见FLUME-1231。所以对open进行了同步。另外当在运行flume过程中出现类似异常“java.io.IOException: Callable timed out after 10000 ms on file”和"java.util.concurrent.TimeoutException"时,需要在这个方法上面config.set("dfs.socket.timeout", "3600000")增加超时时间,参考http://blog.csdn.net/yangbutao/article/details/8845025 

writer包含的三类均有两个open方法,一个是对应不压缩的open(String filePath) ,一个是对应压缩的open(String filePath, CompressionCodec codec,CompressionType cType)。

首先writer若为HDFSSequenceFile,是支持压缩的,open(String filePath)会调用open(filePath, null, CompressionType.NONE)压缩方法,只不过没有压缩类型。压缩open方法先判断是否使用了本地文件系统,然后根据hadoop的配置信息是否支持追加"hdfs.append.support",构造相应的SequenceFile即writer。其中的serializer若为HDFSWritableSerializer则writer的Key为LongWritable类型,Value为BytesWritable二进制类型;若为HDFSTextSerializer,writer的Key为LongWritable类型,Value为Text文本类型。

其次writer若为HDFSDataStream,是不支持压缩的。它的压缩方法open(String filePath, CompressionCodec codec,CompressionType cType)直接调用非压缩方法open(filePath)。open(filePath)判断是否使用了本地文件系统;然后根据是否支持append操作(获取配置的"hdfs.append.support"参数),构造对应的输出流outStream;然后构造serializer,有三种类型BodyTextEventSerializer、HeaderAndBodyTextEventSerializer、FlumeEventAvroEventSerializer,前两种支持追加,最后一种不支持追加,所以FlumeEventAvroEventSerializer不能将"hdfs.append.support"设置为true。如果支持追加就执行serializer.afterReopen()前两种serializer未实现这个方法(1.3.0),不支持就serializer.afterCreate()前两种也未实现这个方法,第三种则是dataFileWriter.create(getSchema(), getOutputStream())。

最后writer若为HDFSCompressedDataStream,就是针对压缩的,其open(String filePath)会使用默认的DefaultCodec以及CompressionType.BLOCK来调用压缩open(String filePath, CompressionCodec codec,CompressionType cType)。压缩方法和HDFSDataStream的压缩方法类似,区别有两点一个是serializer的输出流变成压缩输出流了;另一个就是最后加了isFinished = false表示压缩流是否完毕。

回到BucketWriter,如果rollInterval(按时间滚动文件)不为0,则创建一个Callable,放入timedRollFuture中rollInterval秒之后关闭文件,默认是30s写一个文件,这只是控制文件滚动的3个条件之一;

 isOpen = true表示文件已打开,可以write了。

C、回到上面7中,shouldRotate()方法会判断文件中的行数和文件的大小是否达到配置文件中的配置,如果任何一个满足条件则可以关闭文件,这是控制文件滚动的3个条件中的两个。close()方法会关闭文件,再清理俩线程池及一些其他的清理工作,及改名(将.tmp文件改名),再open()就又到了上面B中所说的。

D、writer.append(event)这是向HDFS中写数据的地方。这里又要分很多讨论了,因为writer有三类。

writer为HDFSSequenceFile:append(event)方法,会先通过serializer.serialize(e)把event处理成一个Key和一个Value。

(1)serializer为HDFSWritableSerializer时,则Key会是event.getHeaders().get("timestamp"),如果没有"timestamp"的Headers则使用当前系统时间System.currentTimeMillis(),然后将时间封装成LongWritable;Value是将event.getBody()封装成BytesWritable,代码是bytesObject.set(e.getBody(), 0, e.getBody().length);

(2)serializer为HDFSTextSerializer时,Key和上述HDFSWritableSerializer一样;Value会将event.getBody()封装成Text,代码是textObject.set(e.getBody(), 0, e.getBody().length)。

writer.append(event)中会将Key和Value,writer.append(record.getKey(), record.getValue())。

writer为HDFSDataStream:append(event)方法直接调用serializer.write(e)。

(1)serializer为BodyTextEventSerializer,则其write(e)方法会将e.getBody()写入输出流,并根据配置再写入一个"\n";

(2)serializer为HeaderAndBodyTextEventSerializer,则其write(e)方法会将e.getHeaders() + " "(注意此空格)和e.getBody()写入输出流,并根据配置再写入一个"\n";

(3)serializer为FlumeEventAvroEventSerializer,则其write(e)方法会将event整体写入dataFileWriter。

writer为HDFSCompressedDataStream:append(event)方法会首先判断是否完成一个阶段的压缩isFinished,如果是则更新压缩输出流的状态,并isFinished=false,否则剩下的执行和HDFSDataStream.append(event)相同。

E、是做一些统计工作processSize是统计文件大小的;eventCounter是统计文件行数的;batchCounter是统计最近一次flush之后的处理的event数;

F、如果处理的event数达到batchSize则刷新到HDFS中,flush()。flush()方法会首先执行writer.sync()即写入HDFS,然后清空batchCounter表明这次batch已经完成,可以准备下次的。涉及到writer就会涉及很多写入类型:

writer为HDFSSequenceFile:sync()方法执行SequenceFile.Writer.syncFs()将数据写入HDFS中;

writer为HDFSDataStream:sync()方法执行

writer为HDFSCompressedDataStream:sync()方法先执行serializer.flush():只有FlumeEventAvroEventSerializer的flush()方法也有实现dataFileWriter.flush(),其他俩BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未实现flush()方法。然后执行outStream.flush()和outStream.sync()将数据刷新至HDFS中。

如果idleTimeout>0,表示文件超时时间,超时后就成为无效文件需要关闭(默认是0不允许关闭的),构造一个Callable对象idleAction执行内容是:close()方法,idleClosed = true表示超时关闭了这个bucketwriter,而且onIdleCallback.run(onIdleCallbackPath)会将onIdleCallbackPath从HDFSEventSink.sfWriters中删除对应对应的bucketwriter,表示这个文件已经写完了。然后将这个idleAction放入timedRollerPool中idleTimeout秒后执行。

8、回到HDFSEventSink.process()方法中,会根据这次事务处理的event数量更新相应的计数器;

9、遍历writers,挨个刷新BucketWriter至HDFS;

10、transaction.commit();//提交事务

11、transaction.rollback();//异常后回滚

12、transaction.close();//关闭事务

四、stop()方法。首先会遍历sfWriters,挨个close(BucketWriter):BucketWriter.close()方法,如果isOpen=true表示文件还处于打开状态,则writer.close()(这里的writer就不分情况了,HDFSSequenceFile就直接writer.close();其他俩都是先flush(好些都没实现)再beforClose(好些都没实现)输出流再flush、sync、close),BucketWriter.close()方法方法接下来关闭俩线程池以及改名等清理操作。HDFSEventSink的stop()方法接下来是关闭俩线程池,清理一些数据比如sfWriters.clear()。

ps:1、BucketWriter中的public方法都是线程安全的,包括append、close、flush三个均是同步方法,会调用相应的do方法,做具体的操作。

2、callWithTimeout方法需要注意,在HDFSEventSink中多次用到这个方法:append、flush、close,这个方法会将对应的Callable放入callTimeoutPool线程池中执行,并等待callTimeout(默认是10000) ms返回结果。

问题1:WriterLinkedHashMap的sfWriters除了设置hdfs.idleTimeout且>0时才会从sfWriters中remove掉超时的bucketwriter,其它地方并没有发现remove操作,那么以后随着写入文件的增多sfWriters会不会始终增大?

解:肯定不会啊。别忘了还有一个"hdfs.maxOpenFiles"参数默认是5000,追踪发现HDFSEventSink内部静态类WriterLinkedHashMap继承了LinkedHashMap,并重写了removeEldestEntry方法,这个方法在sfWriters.put时总会调用,当sfWriters.size()>maxOpenFiles时就是自动清理之时了。maxOpenFiles就是sfWriters得最大容量。

这次的sink比较复杂,希望我写的大伙能够看懂,期间还有一些细节不太清楚,不过不影响整体的理解。

不解1:bucketwriter类中的doOpen方法中hadoop的RPC线程非安全,说是可以从FLUME-1231这得到解释

不解2:同样doOpen方法中有说“Need to get reference to FS using above config before underlying  writer does in order to avoid shutdown hook & IllegalStateExceptions  这里也表示疑问,为什么这么说?

不解3:为什么HDFSWriter的3个实现类的open()方法中,均考虑了conf.getBoolean("hdfs.append.support", false) == true?一个是可追加的一个是不可追加的。但是都是一个SequenceFile.Writer或者FSDataOutputStream,尤其是在HDFSSequenceFile中的writer能不能追加似乎根本没什么区别,充其量是一个writer的参数是FSDataOutputStream,另外一个则不是,其他俩好歹还有需要设置appending=true用来判断是否可重复打开但也是有点牵强,都可以合二为一,但是为什么不那么做呢?

 不解4:BucketPath.escapeString这个方法还没搞懂,导致格式化的结果不甚明了。。。哎

欢迎大伙交流!!

 类似资料: