NightPxy 个人技术博客

flume-源码解析-架构

Posted on By NightPxy

版本

基于 Flume CDH 版
版本号: flume-ng-1.6.0-cdh5.7.0

Source 模块

Source的核心处理为

  • 构造出一个核心对象Source,然后透过线程不停监听目标源
  • 在Source的内部会有ChannelProcessor来负责读取数据写入Channel

核心对象 Source

org.apache.flume.Source 是Flume对源的抽象.

核心逻辑

它的核心是一个来自外部传入的ChannelProcessor.
这代表着Source应该以怎样的方式从数据源读取数据并写入到目标Channel中.

void setChannelProcessor(ChannelProcessor channelProcessor);

Source分类

在Flume的内部设计中,将Source分为两大类

  • EventDrivenSource
    纯粹以event驱动,不需要额外驱动程序参与的 比如 AvroSource、ExecSource,SpoolDirectorySource .
    这类Source需要自提供线程执行
  • PollableSource
    需要以驱动程序来读取的Source 比如 TaildirSource、kafkaSource 这类Source将有框架线程执行

核心对象工厂 SourceFactory

org.apache.flume.SourceFactory
以怎样的方式(用户配置属性)构造出核心对象 org.apache.flume.Source的某一个具体实现

核心逻辑

SourceFactory 通过 SourceName 构造一个指定的Source具体实现子类

Source create(String sourceName, String type) 

SourceRunner

org.apache.flume.SourceRunner 是Source的执行抽象
它代表应该以如何的策略启动执行一个Source的执行线程,比如是以每个Source的独立线程启动,还是以框架的共有线程启动等等

核心逻辑

SourceRunner 的核心方法是forSource,将一个构造好的Source对象线程执行起来
对应的,在SourceRunner中,也有两类 EventDrivenSourceRunner 和 PollableSourceRunner

这两者处理上区别是

  • EventDrivenSourceRunner 将以每一个Source实例作为一个独立线程启动执行
public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }

    return runner;
  }

Channel 模块

Flume的Channel,是整个Flume的核心部分.虽然在体系上Source,Channel,Sink平级,但实际设计中,核心逻辑都是围绕Channel来展开的,其地位要高于Source和Sink

Flume的Channel设计,实质是设计模式中的组合模式,它将业务逻辑为两个部分独立抽象

  • Channel 自身的业务逻辑抽象
  • Transaction 部分的业务逻辑抽象

Channel

public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

这是所有Channel的接口,代表Channel的核心三要素:put,get,transaction

在Channel部分自身的设计中,使用设计模式中的模板模式,为整个Channel制定了骨干逻辑

org.apache.flume.channel.AbstractChannel
抽象类 org.apache.flume.channel.AbstractChannel,为Channel制定了LifecycleAware方面逻辑

org.apache.flume.channel.BasicChannelSemantics
抽象类 BasicChannelSemantics.为Channel制定了核心三要素,put,get,transaction的骨干逻辑,将Channel的Put/Get操作,转换为Transaction的转换操作

这三者继承关系如下

继承级别
顶层基类 org.apache.flume.Channel
继承自Channel org.apache.flume.channel.AbstractChannel
继承自AbstractChannel org.apache.flume.channel.BasicChannelSemantics

BasicChannelSemantics 重点关注
因为实际Channel一般实现BasicChannelSemantics,直接套用骨干逻辑执行,将Channel的抽象转换为Trans的抽象

public abstract class BasicChannelSemantics extends AbstractChannel {
    //当前事务 ThreadStatic 
    private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();
    
    /**
    * 将创建事务的逻辑延迟到子类完成,这也是子类需要实现的地方.
    * 毕竟基类只是骨架,它不可能知道一个具体的事务是若何产生的,只能交给子类去实现  
    */
    protected abstract BasicTransactionSemantics createTransaction();
    
    /**
    * channel.put 的定制流程  
    *    channel.put 的实质等于获取一个 transaction 来 transaction.put
    */
    public void put(Event event) throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        transaction.put(event);
    }
    
    /**
    * channel.take 的定制流程  
    *    channel.take 的实质等于获取一个 transaction 来 transaction.take
    */
    public Event take() throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        return transaction.take();
    }
    
    /**
    * channel.transaction 的定制流程  
    *    定制如何获取一个 transaction 
    */
    public Transaction getTransaction() {
        //如果没有初始化过就调用一次初始化方法.
        //保证只初始化一次,标准的双锁模式
        if (!initialized) {
          synchronized (this) {
            if (!initialized) {
              initialize();
              initialized = true;
            }
          }
        }
        
        //取事务的定制
        //线程ThreadStatic,如果没有,或者有但是已被关闭,就新开一个事务 
        BasicTransactionSemantics transaction = currentTransaction.get();
        if (transaction == null || transaction.getState().equals(
                BasicTransactionSemantics.State.CLOSED)) {
          //这里实质是使用子类中的创建事务      
          transaction = createTransaction();
          currentTransaction.set(transaction);
        }
        return transaction;
      }
}

BasicTransactionSemantics

在BasicChannelSemantics中,已经定制了如何在Channel中take,put等等,但那只是骨架
org.apache.flume.channel.BasicTransactionSemantics 定义了 如何真正以事务的方式 take,put
BasicTransactionSemantics 也是一个模板模式

public abstract class BasicTransactionSemantics implements Transaction {

  /**
  * 这6个方法是真正需要子类实现的地方
  * BasicTransactionSemantics 提供对这6个方法的模板调用
  */
  protected void doBegin() throws InterruptedException {}
  protected abstract void doPut(Event event) throws InterruptedException;
  protected abstract Event doTake() throws InterruptedException;
  protected abstract void doCommit() throws InterruptedException;
  protected abstract void doRollback() throws InterruptedException;
  protected void doClose() {}
  
  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");

    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }
  
  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "take() called when transaction is %s!", state);

    try {
      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    }
  }
  
  public void begin() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "begin() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.NEW),
        "begin() called when transaction is " + state + "!");

    try {
      doBegin();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
    state = State.OPEN;
  }
  
  public void commit() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "commit() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "commit() called when transaction is %s!", state);

    try {
      doCommit();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
    state = State.COMPLETED;
  }

  public void rollback() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "rollback() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "rollback() called when transaction is %s!", state);

    state = State.COMPLETED;
    try {
      doRollback();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

  public void close() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "close() called from different thread than getTransaction()!");
    Preconditions.checkState(
            state.equals(State.NEW) || state.equals(State.COMPLETED),
            "close() called when transaction is %s"
            + " - you must either commit or rollback first", state);

    state = State.CLOSED;
    doClose();
  }
}

ChannelProcessor

ChannelProcessor 包裹一个代表一个或多个Channel实例的ChannelSelector
它是负责 以怎样的策略处理由Source到Channel,比如拦截器,事务包裹等等 后面详述

SinkProcessor

org.apache.flume.SinkProcessor 包裹一个或多个Sink实例
它是负责以怎么样的策略处理从Channel到Sink,比如容灾输出,负载均衡输出等等策略,后面详述

Sink

核心对象 Sink

org.apache.flume.Sink 与Source类似,对输出的抽象
它的核心是一个Channel对象的引用和等待子类实现的Process方法

public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  public Status process() throws EventDeliveryException;
}

org.apache.flume.sink.AbstractSink
这是一个提供Sink的LifecycleAware默认实现的抽象类.一般Sink实际是继承AbstractSink

abstract public class AbstractSink implements Sink, LifecycleAware {
  private Channel channel;
  private String name;

  private LifecycleState lifecycleState;

  public AbstractSink() {
    lifecycleState = LifecycleState.IDLE;
  }
  ...Lifecycle相关...
}

核心对象工厂 SinkFactory

org.apache.flume.SinkFactory 与Source类似,根据用户指定的配置产生一个可用的Sink子类实现实例

核心对象线程载体 SinkRunner

org.apache.flume.SinkRunner 是Sink的线程载体
它的核心就是使用线程不停的去执行包裹Sink的SinkProcessor.Process方法

public void run() {
      logger.debug("Polling sink runner starting");

      while (!shouldStop.get()) {
        try {
          if (policy.process().equals(Sink.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.debug("Interrupted while processing an event. Exiting.");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (Exception e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          if (e instanceof EventDeliveryException) {
            counterGroup.incrementAndGet("runner.deliveryErrors");
          } else {
            counterGroup.incrementAndGet("runner.errors");
          }
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }