多级采集流
在实际生产中,日志采集一般架构为多级采集流. 在Flume中,多级采集流使用 AVRO-Sink 与 AVRO-Source 配对来完成.
AVRO-Source
Avro Source使用Netty服务器来处理传入的请求.Netty使用Java的非阻塞I/O(NIO),所以效率较高
AVRO-Source 配置
配置名 | 描述 |
---|---|
type | avro |
bind | 监听的机器名或IP |
port | 监听的端口 |
threads | 最大处理工作线程数 默认无穷大 |
compression-type | 压缩方式 可选:none(不压缩) deflate(使用压缩) |
ipFilter | netty的IP过滤器 |
ipFilterRules | netty的IP过滤规则 如allow:ip:127.,allow:name:localhost,deny:ip: |
Channel Selector
通道选择器(Channel Selector)是一种Source上的概念
它的意思是,在Source采集到Channel的过程中,能够依据一定的策略进行Channel选择
依靠通道选择器的特性,可以提供诸如通道路由,分流,高可用等等功能
Replicating Channel Selector
Replicating 会将source过来的events发往所有channel.
Multiplexing Channel Selector
Multiplexing 可以选择该发往哪些channel
Sink Processor
Sink Processor 是 Sink上的概念.
默认的Sink,只能接收一个Sink Process.但可以配置多个Sink以增加分流以及高可用等等处理
配置多个Sink与Single Sink机制没有冲突
因为这里的多个Sink只是为了负载均衡以及容灾,最终还是只有一个Sink实际处理*
Failover Sink Processor
Failover Sink Processor 维护一个带优先级排序的 Sink 列表,它保证一个Event会被这个列表中的某一个处理
工作原理
它的工作方式是内部有两个Sink池.活动池与故障池.这两者都是优先级排序的.
Sink过程总是获取当前活动池中优先级最大的一个.如果一个Sink被请求失败,会移动到故障池中并设置一个恢复时间.每个故障池中的Sink恢复时间到后都会重新进入活动池.
这样随着Sink在活动池和故障池中不断的进出,就能以可用且优先的顺序持续使用Sink
配置详解
配置节 | 描述 |
---|---|
processor.type | Failover Sink Processor类型为: failover |
processor.priority. |
优先级 值越大表示优先级越高. 注意优先级级别必须唯一 |
processor.maxpenalty | 失败Sink的最大恢复冷却时间 单位:毫秒 默认:30000 |
Load balancing Sink Processor
负载均衡处理器(Load balancing Sink Processor),提供了在多个接收器上负载均衡流的能力
均衡机制
内置均衡选择机制为round_robin或随机选择机制分配负载
该选择器使用其配置的选择机制选择下一个接收器并调用它。
容错机制与黑名单
如果所选的接收器未能传递事件,处理器通过其配置的选择机制选择下一个可用的接收器。 此实现没有将失败的sink列入黑名单,而是继续乐观地尝试每个可用的sink。如果所有的接收器调用都导致失败,那么选择器将失败传播给接收器运行器。
如果启用了backoff, sink处理器将会将失败的接收器列入黑名单,并将其删除,以便选择一个给定的超时。当超时结束时,如果接收器仍然是无响应的超时将按指数增加,以避免可能陷入对无响应接收器的长时间等待。禁用此功能后,在round_robin中,所有失败的接收器负载将被依次传递到下一个接收器,因此不能均衡
配置详解
配置节 | 描述 |
---|---|
processor.type | Load balancing Sink Processor类型为: load_balance |
processor.backoff | 是否开启黑名单机制 默认false |
processor.selector | 可选值 round_robin(默认),random |
processor.selector.maxTimeOut | 黑名单恢复冷却 单位毫秒 默认(30000) |
拦截器
# --------------- Agent -------------------
# netcat-source采集 replicating推送至 logger-channel,hdfs-channel
agent.sources = netcat-source
agent.channels = logger-channel hdfs-channel
agent.sinks = logger-sink hdfs-sink
# --------------- Source -------------------
agent.sources.netcat-source.type = netcat
agent.sources.netcat-source.bind = 0.0.0.0
agent.sources.netcat-source.port = 44444
agent.sources.netcat-source.selector.type = replicating
agent.sources.netcat-source.channels = logger-channel hdfs-channel
# --------------- Channel -------------------
# logger-channel
agent.channels.logger-channel.type = memory
agent.channels.logger-channel.capacity = 10000
agent.channels.logger-channel.transactionCapacity = 10000
agent.channels.logger-channel.byteCapacityBufferPercentage = 20
agent.channels.logger-channel.byteCapacity = 800000
# hdfs-channel
agent.channels.hdfs-channel.type = memory
agent.channels.hdfs-channel.capacity = 10000
agent.channels.hdfs-channel.transactionCapacity = 10000
agent.channels.hdfs-channel.byteCapacityBufferPercentage = 20
agent.channels.hdfs-channel.byteCapacity = 800000
# --------------- Sink -------------------
# logger-sink
agent.sinks.logger-sink.type = logger
agent.sinks.logger-sink.channel = logger-channel
# hdfs-Sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.channel = hdfs-channel
agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop000:9000/flume/test/netcat-memory-select-logger-hdfs/dt=%y-%m-%d/
agent.sinks.hdfs-sink.hdfs.filePrefix = events-
agent.sinks.hdfs-sink.hdfs.fileSuffix = .log
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.rollInterval = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 0
agent.sinks.hdfs-sink.hdfs.rollSize = 100
agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
flume-ng agent \
--name agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/netcat-memory-select-logger-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55555
# --------------- nc-agent -------------------
nc-agent.sources = nc-agent-netcat-source
nc-agent.channels = nc-agent-memory-channel
nc-agent.sinks = nc-agent-avro-sink
# --------------- nc-agent-netcat-source -------------------
nc-agent.sources.nc-agent-netcat-source.channels = nc-agent-memory-channel
nc-agent.sources.nc-agent-netcat-source.type = netcat
nc-agent.sources.nc-agent-netcat-source.bind = 0.0.0.0
nc-agent.sources.nc-agent-netcat-source.bind = 40001
# --------------- nc-agent-memory-channel -------------------
nc-agent.channels.nc-agent-memory-channel.type = memory
nc-agent.channels.nc-agent-memory-channel.capacity = 1000
nc-agent.channels.nc-agent-memory-channel.transactionCapacity = 1000
nc-agent.channels.nc-agent-memory-channel.byteCapacityBufferPercentage = 20
nc-agent.channels.nc-agent-memory-channel.byteCapacity = 800000
# --------------- nc-agent-avro-sink -------------------
nc-agent.sinks.nc-agent-avro-sink.channel = nc-agent-memory-channel
nc-agent.sinks.nc-agent-avro-sink.type = avro
nc-agent.sinks.nc-agent-avro-sink.hostname = 0.0.0.0
nc-agent.sinks.nc-agent-avro-sink.port = 40010
# ----------------------------------- 华丽的分割线 --------------------------------------------
# --------------- exec-agent -------------------
exec-agent.sources = exec-agent-exec-source
exec-agent.channels = exec-agent-memory-channel
exec-agent.sinks = exec-agent-avro-sink
# --------------- exec-agent-netcat-source -------------------
exec-agent.sources.exec-agent-exec-source.channels = exec-agent-memory-channel
exec-agent.sources.exec-agent-exec-source.type = exec
exec-agent.sources.exec-agent-exec-source.command = tail -F /home/hadoop/data/test.log
# --------------- exec-agent-memory-channel -------------------
exec-agent.channels.exec-agent-memory-channel.type = memory
exec-agent.channels.exec-agent-memory-channel.capacity = 1000
exec-agent.channels.exec-agent-memory-channel.transactionCapacity = 1000
exec-agent.channels.exec-agent-memory-channel.byteCapacityBufferPercentage = 20
exec-agent.channels.exec-agent-memory-channel.byteCapacity = 800000
# --------------- exec-agent-avro-sink -------------------
exec-agent.sinks.exec-agent-avro-sink.channel = exec-agent-memory-channel
exec-agent.sinks.exec-agent-avro-sink.type = avro
exec-agent.sinks.exec-agent-avro-sink.hostname = 0.0.0.0
exec-agent.sinks.exec-agent-avro-sink.port = 40010
# ----------------------------------- 华丽的分割线 --------------------------------------------
# --------------- logger-agent -------------------
logger-agent.sources = logger-agent-avro-source
logger-agent.channels = logger-agent-file-channel
logger-agent.sinks = logger-agent-logger-sink
# --------------- logger-agent-avro-source -------------------
logger-agent.sources.logger-agent-avro-source.channels = logger-agent-file-channel
logger-agent.sources.logger-agent-avro-source.type = avro
logger-agent.sources.logger-agent-avro-source.bind = 0.0.0.0
logger-agent.sources.logger-agent-avro-source.port = 40010
# --------------- logger-agent-file-channel -------------------
logger-agent.channels.logger-agent-file-channel.type = file
logger-agent.channels.logger-agent-file-channel.checkpointDir = /home/hadoop/script/flume/agree-agent/logger-agent-checkpoint
logger-agent.channels.logger-agent-file-channel.dataDirs = /home/hadoop/script/flume/agree-agent/logger-agent-data
# --------------- logger-agent-logger-sink -------------------
logger-agent.sinks.logger-agent-logger-sink.channel = logger-agent-file-channel
logger-agent.sinks.logger-agent-logger-sink.type = logger
flume-ng agent \
--name logger-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55560
flume-ng agent \
--name nc-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55561
flume-ng agent \
--name exec-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/agree-agent.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=55562