NightPxy 个人技术博客

Flume 部署

Posted on By NightPxy

多级采集流

在实际生产中,日志采集一般架构为多级采集流. 在Flume中,多级采集流使用 AVRO-Sink 与 AVRO-Source 配对来完成.

AVRO-Source

Avro Source使用Netty服务器来处理传入的请求.Netty使用Java的非阻塞I/O(NIO),所以效率较高

AVRO-Source 配置

配置名 描述
type avro
bind 监听的机器名或IP
port 监听的端口
threads 最大处理工作线程数 默认无穷大
compression-type 压缩方式 可选:none(不压缩) deflate(使用压缩)
ipFilter netty的IP过滤器
ipFilterRules netty的IP过滤规则 如allow:ip:127.,allow:name:localhost,deny:ip:

Channel Selector

通道选择器(Channel Selector)是一种Source上的概念
它的意思是,在Source采集到Channel的过程中,能够依据一定的策略进行Channel选择
依靠通道选择器的特性,可以提供诸如通道路由,分流,高可用等等功能

Replicating Channel Selector

Replicating 会将source过来的events发往所有channel.

Multiplexing Channel Selector

Multiplexing 可以选择该发往哪些channel

Sink Processor

Sink Processor 是 Sink上的概念.
默认的Sink,只能接收一个Sink Process.但可以配置多个Sink以增加分流以及高可用等等处理

配置多个Sink与Single Sink机制没有冲突
因为这里的多个Sink只是为了负载均衡以及容灾,最终还是只有一个Sink实际处理*

Failover Sink Processor

Failover Sink Processor 维护一个带优先级排序的 Sink 列表,它保证一个Event会被这个列表中的某一个处理

工作原理
它的工作方式是内部有两个Sink池.活动池与故障池.这两者都是优先级排序的.
Sink过程总是获取当前活动池中优先级最大的一个.如果一个Sink被请求失败,会移动到故障池中并设置一个恢复时间.每个故障池中的Sink恢复时间到后都会重新进入活动池.
这样随着Sink在活动池和故障池中不断的进出,就能以可用且优先的顺序持续使用Sink

配置详解

配置节 描述
processor.type Failover Sink Processor类型为: failover
processor.priority. 优先级 值越大表示优先级越高. 注意优先级级别必须唯一
processor.maxpenalty 失败Sink的最大恢复冷却时间 单位:毫秒 默认:30000

Load balancing Sink Processor

负载均衡处理器(Load balancing Sink Processor),提供了在多个接收器上负载均衡流的能力

均衡机制

内置均衡选择机制为round_robin或随机选择机制分配负载
该选择器使用其配置的选择机制选择下一个接收器并调用它。

容错机制与黑名单

如果所选的接收器未能传递事件,处理器通过其配置的选择机制选择下一个可用的接收器。 此实现没有将失败的sink列入黑名单,而是继续乐观地尝试每个可用的sink。如果所有的接收器调用都导致失败,那么选择器将失败传播给接收器运行器。

如果启用了backoff, sink处理器将会将失败的接收器列入黑名单,并将其删除,以便选择一个给定的超时。当超时结束时,如果接收器仍然是无响应的超时将按指数增加,以避免可能陷入对无响应接收器的长时间等待。禁用此功能后,在round_robin中,所有失败的接收器负载将被依次传递到下一个接收器,因此不能均衡

配置详解

配置节 描述
processor.type Load balancing Sink Processor类型为: load_balance
processor.backoff 是否开启黑名单机制 默认false
processor.selector 可选值 round_robin(默认),random
processor.selector.maxTimeOut 黑名单恢复冷却 单位毫秒 默认(30000)

拦截器

# ---------------      Agent      -------------------
# netcat-source采集 replicating推送至 logger-channel,hdfs-channel

agent.sources = netcat-source
agent.channels = logger-channel hdfs-channel
agent.sinks = logger-sink hdfs-sink

# ---------------      Source     -------------------

agent.sources.netcat-source.type = netcat
agent.sources.netcat-source.bind = 0.0.0.0
agent.sources.netcat-source.port = 44444
agent.sources.netcat-source.selector.type = replicating
agent.sources.netcat-source.channels = logger-channel hdfs-channel

# ---------------      Channel    -------------------

# logger-channel
agent.channels.logger-channel.type = memory
agent.channels.logger-channel.capacity = 10000
agent.channels.logger-channel.transactionCapacity = 10000
agent.channels.logger-channel.byteCapacityBufferPercentage = 20
agent.channels.logger-channel.byteCapacity = 800000

# hdfs-channel
agent.channels.hdfs-channel.type = memory
agent.channels.hdfs-channel.capacity = 10000
agent.channels.hdfs-channel.transactionCapacity = 10000
agent.channels.hdfs-channel.byteCapacityBufferPercentage = 20
agent.channels.hdfs-channel.byteCapacity = 800000

# ---------------      Sink     -------------------

# logger-sink
agent.sinks.logger-sink.type = logger
agent.sinks.logger-sink.channel = logger-channel

# hdfs-Sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.channel = hdfs-channel
agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop000:9000/flume/test/netcat-memory-select-logger-hdfs/dt=%y-%m-%d/
agent.sinks.hdfs-sink.hdfs.filePrefix = events-
agent.sinks.hdfs-sink.hdfs.fileSuffix = .log
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.rollInterval = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 0
agent.sinks.hdfs-sink.hdfs.rollSize = 100
agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true



flume-ng agent \
--name agent  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/netcat-memory-select-logger-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55555

# ---------------   nc-agent      -------------------

nc-agent.sources = nc-agent-netcat-source
nc-agent.channels = nc-agent-memory-channel
nc-agent.sinks = nc-agent-avro-sink

# ---------------   nc-agent-netcat-source      -------------------

nc-agent.sources.nc-agent-netcat-source.channels = nc-agent-memory-channel
nc-agent.sources.nc-agent-netcat-source.type = netcat
nc-agent.sources.nc-agent-netcat-source.bind = 0.0.0.0
nc-agent.sources.nc-agent-netcat-source.bind = 40001


# ---------------   nc-agent-memory-channel      -------------------

nc-agent.channels.nc-agent-memory-channel.type = memory
nc-agent.channels.nc-agent-memory-channel.capacity = 1000
nc-agent.channels.nc-agent-memory-channel.transactionCapacity = 1000
nc-agent.channels.nc-agent-memory-channel.byteCapacityBufferPercentage = 20
nc-agent.channels.nc-agent-memory-channel.byteCapacity = 800000

# ---------------   nc-agent-avro-sink      -------------------

nc-agent.sinks.nc-agent-avro-sink.channel = nc-agent-memory-channel
nc-agent.sinks.nc-agent-avro-sink.type = avro
nc-agent.sinks.nc-agent-avro-sink.hostname = 0.0.0.0
nc-agent.sinks.nc-agent-avro-sink.port = 40010


# -----------------------------------   华丽的分割线      --------------------------------------------



# ---------------   exec-agent      -------------------

exec-agent.sources = exec-agent-exec-source
exec-agent.channels = exec-agent-memory-channel
exec-agent.sinks = exec-agent-avro-sink

# ---------------   exec-agent-netcat-source      -------------------

exec-agent.sources.exec-agent-exec-source.channels = exec-agent-memory-channel
exec-agent.sources.exec-agent-exec-source.type = exec
exec-agent.sources.exec-agent-exec-source.command  = tail -F /home/hadoop/data/test.log


# ---------------   exec-agent-memory-channel      -------------------

exec-agent.channels.exec-agent-memory-channel.type = memory
exec-agent.channels.exec-agent-memory-channel.capacity = 1000
exec-agent.channels.exec-agent-memory-channel.transactionCapacity = 1000
exec-agent.channels.exec-agent-memory-channel.byteCapacityBufferPercentage = 20
exec-agent.channels.exec-agent-memory-channel.byteCapacity = 800000

# ---------------   exec-agent-avro-sink      -------------------

exec-agent.sinks.exec-agent-avro-sink.channel = exec-agent-memory-channel
exec-agent.sinks.exec-agent-avro-sink.type = avro
exec-agent.sinks.exec-agent-avro-sink.hostname = 0.0.0.0
exec-agent.sinks.exec-agent-avro-sink.port = 40010


# -----------------------------------   华丽的分割线      --------------------------------------------

# ---------------   logger-agent      -------------------

logger-agent.sources = logger-agent-avro-source
logger-agent.channels = logger-agent-file-channel
logger-agent.sinks = logger-agent-logger-sink

# ---------------   logger-agent-avro-source      -------------------

logger-agent.sources.logger-agent-avro-source.channels = logger-agent-file-channel
logger-agent.sources.logger-agent-avro-source.type = avro
logger-agent.sources.logger-agent-avro-source.bind = 0.0.0.0
logger-agent.sources.logger-agent-avro-source.port  = 40010


# ---------------   logger-agent-file-channel      -------------------

logger-agent.channels.logger-agent-file-channel.type = file
logger-agent.channels.logger-agent-file-channel.checkpointDir  = /home/hadoop/script/flume/agree-agent/logger-agent-checkpoint
logger-agent.channels.logger-agent-file-channel.dataDirs  = /home/hadoop/script/flume/agree-agent/logger-agent-data

# ---------------   logger-agent-logger-sink      -------------------

logger-agent.sinks.logger-agent-logger-sink.channel = logger-agent-file-channel
logger-agent.sinks.logger-agent-logger-sink.type = logger




flume-ng agent \
--name logger-agent  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55560

flume-ng agent \
--name nc-agent  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55561

flume-ng agent \
--name exec-agent  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55562