NightPxy 个人技术博客

flume-源码解析-Source

Posted on By NightPxy

SpoolDirectorySource

SpoolDirectorySource 内部通过Start方法启动一个 SpoolDirectoryRunnable 线程来监听读取文件

 Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
 executor.scheduleWithFixedDelay(
        runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

super.start();

SpoolDirectoryRunnable

SpoolDirectoryRunnable 线程 run 核心代码如下

public void run() {
      int backoffInterval = 250;
      try {
        while (!Thread.interrupted()) {
          // 通过 reader 读取器尝试从监听目录中读取 batchSize 个 Event
          List<Event> events = reader.readEvents(batchSize);
          if (events.isEmpty()) {
            break;
          }
          sourceCounter.addToEventReceivedCount(events.size());
          sourceCounter.incrementAppendBatchReceivedCount();

          try {
            //一旦读取到Event,就会用 getChannelProcessor来得到ChannelProcessor
            //ChannelProcessor其实就是代指的Channel,这是Flume对一组Channels的抽象封装.
            //ChannelProcessor.processEventBatch 将数据写到每一个channel中
            // 写入channel,就是调用channel的put方法写入到putlist提交事务块中
            getChannelProcessor().processEventBatch(events);
            //reader提交 这里的提交是Source自己的提交,SpoolDirectorySource .complete
            reader.commit();
          } catch (ChannelException ex) {
            logger.warn("The channel is full, and cannot write data now. The " +
              "source will try again after " + String.valueOf(backoffInterval) +
              " milliseconds");
            hitChannelException = true;
            if (backoff) {
              TimeUnit.MILLISECONDS.sleep(backoffInterval);
              backoffInterval = backoffInterval << 1;
              backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
                                backoffInterval;
            }
            continue;
          }
          backoffInterval = 250;
          sourceCounter.addToEventAcceptedCount(events.size());
          sourceCounter.incrementAppendBatchAcceptedCount();
        }
      } catch (Throwable t) {
        logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
            "Uncaught exception in SpoolDirectorySource thread. " +
            "Restart or reconfigure Flume to continue processing.", t);
        hasFatalError = true;
        Throwables.propagate(t);
      }
    }