NightPxy 个人技术博客

Flume 使用

Posted on By NightPxy

简介

Flume 是 cloudera 开发的实时日志收集系统

版本

Flume 有两大版本.

  • og版
    这是Flume的初始发行版本.但因为设计不合理,臃肿且BUG极多,这是已经被淘汰使用的版本
  • ng版
    鉴于og(0.9.4)中日志传输极不稳定的情况,cloudera公司决心推翻og版重构.这就是ng版

特点

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
Flume 支持非常多的日志系统的数据发送方,也具备将日志数据写到各种数据接收方的能力

架构

Flume的数据流由Event贯穿始终. 而Event是Flume的最小数据单位.
Flume通过数据接收组件(Source),收集数据并组装成Event数据,然后将Event数据写到队列暂存介质(Channel),再由数据输出组件(Sink),将Event数据从Channel中取出并输出到目标数据接收端

核心概念

以下讨论的Flume的目标版本为 flume-ng-1.6.0-cdh5.7.0

Event
Event是Flume的最小数据单位.它包含两个部分

  • 头(head)
  • 日志数据(字节数组形式)

Agent
一个独立的Flume进程(jvm进程).它包含组件 Source,Channel,Sink,以及它们相互之间的关系

Source
数据收集组件.它可以有多张收集方式. 比如 文件,文件夹

Channel
Event的中转暂存器.它可以有多种介质.比如 memory,file 等等

Sink
数据输出组件.它可以有多种输出方式.比如 控制台,HDFS,kafka 等等

核心组件

Source

Exec-Source 官方说明

Source是Flume的数据收集组件.
不同的收集方式具有不同数据可靠型(Flume重新启动或异常终止,不会丢失数据)
比较常用的收集方式如下

Exec-Source

运行给定Linux命令,并在其标准输出上进行持续监控

可靠性 Exec-Source 是一种不可靠性收集.
比如 tail -F 每次收集将会始终读取最新数据.如果Flume中断,中断时间内数据将无法读取从而导致数据丢失

配置例子

# 命令
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

# 支持Shell方式
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

Spooling-Directory-Source

Spooling-Directory-Source 官方说明

监控读取文件夹中的新文件

可靠性
它的可靠性机制是 确定在读取文件完毕后将目标重命名(或删除). 这样如果在完整读取过程中终止,将会重读文件数据而不会发生数据丢失.

注意

  • Spooling-Directory-Source 不允许文件修改的情况.它仅支持类似离线数据的形式
  • 每次新文件的文件名必须是唯一的(官方建议时间戳).
    Flume将会根据原始文件名(例如xx.txt)重名为已读取(xx.txt.complet).
    当再此进入xx.txt文件时会继续重名为(xx.txt.complet),此时即不被允许的.
    务必注意这种情况,因为此时 Flume的Agent将会直接退出

配置例子

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
# 每N个Event作为一个批次写到Channel.默认100
a1.sources.src-1.batchSize = 100

Taildir-Source

Taildir-Source 官方说明

以一种接近实时的方式监控一些目标文件的增量内容. (暂不支持 windows 操作系统)

可靠性

Taildir-Source 是一种可靠数据源.
它的可靠性机制是 引入一个针对每个文件的已读取偏移量.在每个文件每成功读取一行就移动该文件的读取偏移量.这样在异常终止情况下,将可以根据这个偏移量来继续完成的读取来保证数据不丢失

注意

  • 监控多个文件下,将按照文件修改时间排序.即文件修改时间越老读取越靠前
  • Taildir-Source 按行读取,暂不支持二进制或类似形式的读取
  • Taildir-Source不会重命名文件.如果文件本身被重命名,将会视为一个新的文件从头读取

配置例子

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
# 偏移量存储文件
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
# 声明两个文件组 f1 f2
a1.sources.r1.filegroups = f1 f2
# f1 文件组配置
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
# f2 文件组配置
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2

Channel

Memory-Channel 官方说明

Channel 是Flume中Event的中转暂存器.比较常用的介质有

Memory-Channel

将Event存储在一个可指定大小的内存队列中.
Memory-Channel 它有内存使用所带来的高吞吐量的优点和有数据丢失风险的缺点

配置例子

a1.channels = c1
a1.channels.c1.type = memory
# Memory 中最大存储的 Event 数. 默认100条
a1.channels.c1.capacity = 10000
# 从 Source 拉取 和 推送给 Sink 的每个批次大小 默认100条
a1.channels.c1.transactionCapacity = 10000
# 内存缓冲区占比 默认20%
a1.channels.c1.byteCapacityBufferPercentage = 20
# 内存最大存储 如果设置为0,则为默认200G
a1.channels.c1.byteCapacity = 800000

File-Channel

File-Channel 官方说明

将Event存储在磁盘文件中
File-Channel 将会使用一个文件路径来作为存储数据目录和CheckPoint目录.
这个目录默认是在/Home/User/.flume下

必须为每个File-Channel设置自己专有的路径
如果有多个Channel激活(这是必然的),那么只有一个Channel能锁定占用目录而导致其它的Channel激活失败. 因此,必须为每个File-Channel设置自己专有的路径

配置例子

a1.channels = c1
a1.channels.c1.type = file
# CheckPoint 存储路径
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
# 数据文件存储路径
a1.channels.c1.dataDirs = /mnt/flume/data

Sink

Sink 是Flume的数据输出组件.比较常用的如下

Logger Sink

输出日志到控制台 一般用于开发或测试调试使用

配置例子

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

HDFS-Sink

HDFS-Sink 官方说明

这是重点关注的Sink,需要仔细弄明白

序列化与压缩

HDFS-Sink支持以文本或序列文件的形式,将数据写入HDFS.并且支持对这两种文件类型的压缩

文件分割策略
HDFS-Sink支持以下三种策略周期性的滚动文件(关闭当前文件创建新文件)

  • 基于时间
  • 基于文件大小
  • 基于Event数量

文件分区 HDFS-Sink 可以根据事件发生的时间戳或机器等属性对数据进行分段或分区

配置详解

配置键 描述
type HDFS-Sink的type:hdfs
hdfs.path HDFS地址,例如:hdfs://namenode/flume/webdata/
hdfs.filePrefix 写入 hdfs 的文件名前缀
hdfs.fileSuffix 写入 hdfs 的文件名后缀.比如 .text .log
hdfs.inUsePrefix 临时文件的文件名前缀
hdfs.inUseSuffix 临时文件的文件名后缀 默认 .tmp
hdfs.rollInterval 间隔多长将临时文件滚动成最终目标文件 单位:秒 默认:30 如果设置为0 表示不根据时间来滚动文件
hdfs.rollSize 当临时文件达到多少(单位:bytes)时,滚动成目标文件. 默认:1024 如果设置成0 表示不根据临时文件大小滚动文件
hdfs.rollCount 根据Event数量滚动成目标文件. 默认10,如果设置成0,表示不根据Event数量滚动文件
hdfs.idleTimeout 默认值0,当目前临时文件在idleTimeout指定的多少秒内没有任何数据写入,则将该临时文件滚动成目标文件. 设置成0表示关闭自动关闭临时文件
hdfs.batchSize 默认值100. 每个批次刷新到 HDFS 上的 events 数量
hdfs.codeC 文件压缩格式 可选:gzip, bzip2, lzo, lzop, snappy
hdfs.fileType 默认 SequenceFile.可选 SequenceFile, DataStream(不使用压缩),CompressedStream(使用hdfs.codeC设置的压缩)
hdfs.maxOpenFiles 默认值:5000:最大允许打开的HDFS文件数. 当打开的文件数达到该值,最早打开的文件将会被关闭
hdfs.minBlockReplicas HDFS副本数 默认使用Hadoop配置的副本数 该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件
hdfs.writeFormat 写 sequence 文件的格式。包含:Text, Writable(默认)
hdfs.callTimeout 默认值:10000,执行HDFS操作的超时时间(单位:毫秒)
hdfs.threadsPoolSize 默认值:10,hdfs sink 启动的操作HDFS的线程数
hdfs.rollTimerPoolSize 默认值:1 hdfs sink 启动的根据时间滚动文件的线程数
hdfs.kerberosPrincipal HDFS安全认证kerberos配置
hdfs.kerberosKeytab HDFS安全认证kerberos配置
hdfs.proxyUser 代理用户
hdfs.round 默认值:false,是否启用时间上的”舍入”
hdfs.roundValue 默认值:1,时间上进行“舍入”的值
hdfs.roundUnit 舍入的时间单位.可选 second,minute,hour
hdfs.timeZone 默认值:Local Time,时区
hdfs.useLocalTimeStamp 默认值:flase,是否使用当地时间
hdfs.closeTries 默认值:0,hdfs sink 关闭文件的尝试次数
如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态
设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功
hdfs.retryInterval 默认值:180(秒),hdfs sink 尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1
serializer 默认值:TEXT,序列化类型

配置例子

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-

# 开启时间舍入
a1.sinks.k1.hdfs.round = true
# 时间舍入为 10 分钟
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 舍入10分钟 即每10分钟生成新的  10分钟内合并
# HDFS 输出为 /flume/events/2012-06-12/1150/00

Kafka-Sink

Kafka-Sink 官方说明

Kafka-Sink支持将数据写到 Kafka-Topic 中.(方便对接实时计算)(目前支持为Kafka-0.8.x)

Kafka-Sink 会使用的 Event-Header 属性

  • topic 如果 Event-Header 包含topic,那么Event会被发送到Kafka指定的topic中而忽略配置中指定的topic属性
  • key 如果 Event-Header 包含key,那此时的key将会被交给Kafka用与topic数据分区(同key同分区)
    如果 Event-Header 不包含key,Event会被交给随机分区

配置例子

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mytopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

重要点

transactionCapacity capacity 与 batchSize

batchSize batchSize 是 Source和Sink中的概念,它代表 写入/读取的一个批次(事务)的Event数量.
Event是Flume的最小数据单位,但在Flume的实际传输过程中,它是以N条Event组合成为一个批次进行传输的.即

  • 每事件源Event数量达到 batchSize ,Source就作为一个批次写入到Channel
  • Channel中Event数量每达到batchSize,Sink就会作为一个批次进行拉取

这三者有一个隐晦的限制是
batchsize <= transactionCapacity <= capacity

HDFS-Sink 的小文件策略

HDFS是比较害怕小文件的.因为在HDFS中,一个1byte文件和一个128M文件地位等同,一样在NN中会享有完全的索引存储.如果文件拆分过细,以此带来的文件数量的猛增会极大的影响NN的效率

Flume自身的有三种文件分割策略,在使用时务必注意处理分割时带来小文件问题

  • 基于时间切割
  • 非基于时间切割,需要注意 hdfs.idleTimeout
    在大部分的非基于时间切割场景.都会配置 hdfs.idleTimeout .
    因为现实情况下,如果一个相当长的时间没有来数据,那么无论是根据文件大小还是Event数量都有可能凑不满阈值而导致文件迟迟滚动不了
    但是 hdfs.idleTimeout 本身就可能是产生小文件的来源.因为它固定时间内必然产生文件
    因此 设置 hdfs.idleTimeout 务必要仔细对照业务场景设置 在数据及时和小文件间取得一个平衡

Sink Batch Size 也会导致小文件

HDFS-Sink 的 round 和 roll

rollInterval- 表示每间隔6小时滚动
round-6小时 将一天规整为4个分区,然后落入指定分区

# agent 声明 
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source 设置
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/script/flume/test/a1_taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data/test.log
a1.sources.r1.batchSize = 10

# channel 设置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

# sink 设置
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://hadoop000:9000/flume/test/taildir-memory-hdfs/dt=%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileSuffix = .log

a1.sinks.k1.hdfs.minBlockReplicas=1
# 关闭时间滚动文件
a1.sinks.k1.hdfs.rollInterval = 0
# 关闭Event数量滚动文件
a1.sinks.k1.hdfs.rollCount = 0
# 开启按临时文件大小滚动分区 
a1.sinks.k1.hdfs.rollSize = 200
# 按时间 1分钟 分区
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 1
#a1.sinks.k1.hdfs.roundUnit = minute

./flume-ng agent \
--name a1  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/test/taildir-memory-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=44444

启动案例

./flume-ng agent \
--name a1  \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/hello.conf \
-Dflume.root.logger=INFO,console 	// 指定日志级别(建议使用起来)
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=44444

参考文档

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/