版本
基于 Flume CDH 版
版本号: flume-ng-1.6.0-cdh5.7.0
Source 模块
Source的核心处理为
- 构造出一个核心对象Source,然后透过线程不停监听目标源
- 在Source的内部会有ChannelProcessor来负责读取数据写入Channel
核心对象 Source
org.apache.flume.Source 是Flume对源的抽象.
核心逻辑
它的核心是一个来自外部传入的ChannelProcessor.
这代表着Source应该以怎样的方式从数据源读取数据并写入到目标Channel中.
void setChannelProcessor(ChannelProcessor channelProcessor);
Source分类
在Flume的内部设计中,将Source分为两大类
- EventDrivenSource
纯粹以event驱动,不需要额外驱动程序参与的 比如 AvroSource、ExecSource,SpoolDirectorySource .
这类Source需要自提供线程执行 - PollableSource
需要以驱动程序来读取的Source 比如 TaildirSource、kafkaSource 这类Source将有框架线程执行
核心对象工厂 SourceFactory
org.apache.flume.SourceFactory
以怎样的方式(用户配置属性)构造出核心对象 org.apache.flume.Source的某一个具体实现
核心逻辑
SourceFactory 通过 SourceName 构造一个指定的Source具体实现子类
Source create(String sourceName, String type)
SourceRunner
org.apache.flume.SourceRunner 是Source的执行抽象
它代表应该以如何的策略启动执行一个Source的执行线程,比如是以每个Source的独立线程启动,还是以框架的共有线程启动等等
核心逻辑
SourceRunner 的核心方法是forSource,将一个构造好的Source对象线程执行起来
对应的,在SourceRunner中,也有两类 EventDrivenSourceRunner 和 PollableSourceRunner
这两者处理上区别是
- EventDrivenSourceRunner 将以每一个Source实例作为一个独立线程启动执行
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
Channel 模块
Flume的Channel,是整个Flume的核心部分.虽然在体系上Source,Channel,Sink平级,但实际设计中,核心逻辑都是围绕Channel来展开的,其地位要高于Source和Sink
Flume的Channel设计,实质是设计模式中的组合模式,它将业务逻辑为两个部分独立抽象
- Channel 自身的业务逻辑抽象
- Transaction 部分的业务逻辑抽象
Channel
public interface Channel extends LifecycleAware, NamedComponent {
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
public Transaction getTransaction();
}
这是所有Channel的接口,代表Channel的核心三要素:put,get,transaction
在Channel部分自身的设计中,使用设计模式中的模板模式,为整个Channel制定了骨干逻辑
org.apache.flume.channel.AbstractChannel
抽象类 org.apache.flume.channel.AbstractChannel,为Channel制定了LifecycleAware方面逻辑
org.apache.flume.channel.BasicChannelSemantics
抽象类 BasicChannelSemantics.为Channel制定了核心三要素,put,get,transaction的骨干逻辑,将Channel的Put/Get操作,转换为Transaction的转换操作
这三者继承关系如下
| 继承级别 | 类 |
|---|---|
| 顶层基类 | org.apache.flume.Channel |
| 继承自Channel | org.apache.flume.channel.AbstractChannel |
| 继承自AbstractChannel | org.apache.flume.channel.BasicChannelSemantics |
BasicChannelSemantics 重点关注
因为实际Channel一般实现BasicChannelSemantics,直接套用骨干逻辑执行,将Channel的抽象转换为Trans的抽象
public abstract class BasicChannelSemantics extends AbstractChannel {
//当前事务 ThreadStatic
private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();
/**
* 将创建事务的逻辑延迟到子类完成,这也是子类需要实现的地方.
* 毕竟基类只是骨架,它不可能知道一个具体的事务是若何产生的,只能交给子类去实现
*/
protected abstract BasicTransactionSemantics createTransaction();
/**
* channel.put 的定制流程
* channel.put 的实质等于获取一个 transaction 来 transaction.put
*/
public void put(Event event) throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}
/**
* channel.take 的定制流程
* channel.take 的实质等于获取一个 transaction 来 transaction.take
*/
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}
/**
* channel.transaction 的定制流程
* 定制如何获取一个 transaction
*/
public Transaction getTransaction() {
//如果没有初始化过就调用一次初始化方法.
//保证只初始化一次,标准的双锁模式
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
//取事务的定制
//线程ThreadStatic,如果没有,或者有但是已被关闭,就新开一个事务
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
//这里实质是使用子类中的创建事务
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
}
BasicTransactionSemantics
在BasicChannelSemantics中,已经定制了如何在Channel中take,put等等,但那只是骨架
org.apache.flume.channel.BasicTransactionSemantics 定义了 如何真正以事务的方式 take,put
BasicTransactionSemantics 也是一个模板模式
public abstract class BasicTransactionSemantics implements Transaction {
/**
* 这6个方法是真正需要子类实现的地方
* BasicTransactionSemantics 提供对这6个方法的模板调用
*/
protected void doBegin() throws InterruptedException {}
protected abstract void doPut(Event event) throws InterruptedException;
protected abstract Event doTake() throws InterruptedException;
protected abstract void doCommit() throws InterruptedException;
protected abstract void doRollback() throws InterruptedException;
protected void doClose() {}
protected void put(Event event) {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"put() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"put() called when transaction is %s!", state);
Preconditions.checkArgument(event != null,
"put() called with null event!");
try {
doPut(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}
protected Event take() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"take() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"take() called when transaction is %s!", state);
try {
return doTake();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
public void begin() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"begin() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.NEW),
"begin() called when transaction is " + state + "!");
try {
doBegin();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
state = State.OPEN;
}
public void commit() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"commit() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"commit() called when transaction is %s!", state);
try {
doCommit();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
state = State.COMPLETED;
}
public void rollback() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"rollback() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"rollback() called when transaction is %s!", state);
state = State.COMPLETED;
try {
doRollback();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}
public void close() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"close() called from different thread than getTransaction()!");
Preconditions.checkState(
state.equals(State.NEW) || state.equals(State.COMPLETED),
"close() called when transaction is %s"
+ " - you must either commit or rollback first", state);
state = State.CLOSED;
doClose();
}
}
ChannelProcessor
ChannelProcessor 包裹一个代表一个或多个Channel实例的ChannelSelector
它是负责 以怎样的策略处理由Source到Channel,比如拦截器,事务包裹等等 后面详述
SinkProcessor
org.apache.flume.SinkProcessor 包裹一个或多个Sink实例
它是负责以怎么样的策略处理从Channel到Sink,比如容灾输出,负载均衡输出等等策略,后面详述
Sink
核心对象 Sink
org.apache.flume.Sink 与Source类似,对输出的抽象
它的核心是一个Channel对象的引用和等待子类实现的Process方法
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
public Status process() throws EventDeliveryException;
}
org.apache.flume.sink.AbstractSink
这是一个提供Sink的LifecycleAware默认实现的抽象类.一般Sink实际是继承AbstractSink
abstract public class AbstractSink implements Sink, LifecycleAware {
private Channel channel;
private String name;
private LifecycleState lifecycleState;
public AbstractSink() {
lifecycleState = LifecycleState.IDLE;
}
...Lifecycle相关...
}
核心对象工厂 SinkFactory
org.apache.flume.SinkFactory 与Source类似,根据用户指定的配置产生一个可用的Sink子类实现实例
核心对象线程载体 SinkRunner
org.apache.flume.SinkRunner 是Sink的线程载体
它的核心就是使用线程不停的去执行包裹Sink的SinkProcessor.Process方法
public void run() {
logger.debug("Polling sink runner starting");
while (!shouldStop.get()) {
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.debug("Interrupted while processing an event. Exiting.");
counterGroup.incrementAndGet("runner.interruptions");
} catch (Exception e) {
logger.error("Unable to deliver event. Exception follows.", e);
if (e instanceof EventDeliveryException) {
counterGroup.incrementAndGet("runner.deliveryErrors");
} else {
counterGroup.incrementAndGet("runner.errors");
}
try {
Thread.sleep(maxBackoffSleep);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}