准备资源:apache-flume-1.9.0-bin.tar.gz

一、Flume概述

1. 概念:

Flume是海量日志采集、聚合、传输的系统。

2. 基础架构:

组件

名称

描述
Source 负责收集数据,将收集到的数据封装成Event,Event会进入到Channel中
Event

对日志数据的封装,Event内部数据结构由两部分构成:1. Header部分,用K-V的形式存储一些关键信息;2. Body部分:存储了实际的日志数据内容;

默认情况下,Header中是没有数据的。

Channel 一层数据缓冲,类似于Linux的管道,存放Event对象,供Sink拉取
Sink 负责将从Channel拉取的数据传输到指定的位置
Agent 是一个JVM进程,由Source、Channel、Sink三部分组成,并控制它们协作

Source后可以接多个Channel,Sink前只能接一个Channel;

二、原理

1. Flume事务

Sink端的事务处理,保证了数据绝对不会丢失,但是可能会导致重复处理数据;

Source传入数据大小/Sink拉取数据大小 = batchSize

eventList大小 = transactionCapacity

Channel大小 = capacity

三者关系为:batchSize <= transactionCapacity <= capacity

2. Agent原理

组件名称 描述
Interceptor 拦截器,event一定会经过拦截器,并在拦截器中执行一些逻辑判断处理
Channel Selector

Channel Selector常用类型:

(1) Replicating Channel Selector: 将传入的event分别转发给Channel,每个Channel中的event都是一样的。(默认使用)

(2)Multiplexing Channel Selector: 将传入的event经过一定逻辑判断,分别转发给不同的Channel。

Sink Processor

Sink Processor常用类型:

(1) Default Sink Processor: Channel后只能跟单个Sink,该Sink从Channel中拉取event并进行处理。(默认使用)

(2) Load balancing Sink Processor:Channel后跟多个Sink,轮询或者随机让Sink从Channel中拉取event,达到负载均衡。

(3) Failover Sink Processor:Channel后跟多个Sink,只能同时有一个Sink提供服务,只有当前提供服务的Sink挂了之后,才会切换其他Sink提供服务。

3. 拓扑结构

(1) 基础拓扑结构

(2) 简单串联拓扑结构

(3) 复制或多路复用拓扑结构

(4) 负载均衡或故障转移拓扑结构

(5) 聚合

四、安装以及使用

1. 安装步骤

(1) 上传apache-flume-1.9.0-bin.tar.gz至虚拟机/opt/software下并将其解压缩到/opt/moudle目录下,然后将其改名

# 来到/opt/software
cd /opt/software
# 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ../moudle/
# 改名
mv ../moudle/apache-flume-1.9.0-bin/ ../moudle/flume-1.9.0

(2) 配置环境变量

# 打开自定义的环境变量脚本文件
sudo vim /etc/profile.d/my-env.sh# 环境变量内容为
# FLUME环境变量
export FLUME_HOME=/opt/moudle/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin# 使环境变量生效
source /etc/profile

(3) 删除冲突jar包

# 来带Flume存放jar包的目录
cd $FLUME_HOME/lib
# 删除冲突jar包
rm -rf guava*

(4) 修改配置文件

# 来到配置目录下
cd $FLUME_HOME/conf# 打开配置文件
vim log4j.properties# 修改日志存放的位置
# 需要修改的内容为:
flume.log.dir=/opt/moudle/flume-1.9.0

2. 使用

要想使用flume,必须先写配置文件,配置文件的写法如下:

# Named
a1.sources = r1 r2
a1.channels = c1 c2
a1.sinks = k1 k2# Source
# 对r1的配置
a1.sources.r1.xxx = ...
...
# 对r2的配置
a1.sources.r2.xxx = ...
...# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.xxx = ...
...
# 对c2的配置
a1.channels.c2.xxx = ...
...# Sink Processor
...# Sink
# 对k1的配置
a1.sinks.k1.xxx = ...
...
# 对k2的配置
a1.sinks.k2.xxx = ...
...# Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

假设上面写的配置文件为:test.conf,启动一个agent的命令为:

#  -Dflume.root.logger=INFO,console可以不加,加上是为了将flume的运行日志信息打印到控制台上
# 便于调试
flume-ng agent -c $FLUME_HOME/conf -f your-path/test.conf -n a1 -Dflume.root.logger=INFO,console

五、案例

官方组件使用文档

1. 从端口采集数据,并将采集到的数据打印到终端上

(1) 架构

(2) 配置文件

# 移动到$FLUME_HOME下,新建目录名为:conf-files
# 该目录的作用是用来存放配置文件的
cd $FLUME_HOME
mkdir -p ./conf-files# 在conf-files下新建子目录netcat目录,用来存放具体的配置文件
mkdir -p ./conf-files/netcat# 新建配置文件为:netcat-logger.conf
vim ./conf-files/netcat/netcat-logger.conf# 配置文件内容如下
#Namned
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# Source的类型
a1.sources.r1.type = netcat
# 监听的IP地址
a1.sources.r1.bind = hadoop101
# 监听的端口号
a1.sources.r1.port = 6666# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a1.sinks.k1.type = logger# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出即可

(3) 启动

# 先启动agent,让其监听hadoop101的6666端口
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/netcat/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console# 开启一个新的终端界面,用来连接hadoop101的6666端口,向该端口发送数据
nc hadoop101 6666# 进入发送数据的界面,输入内容,按回车键即可

2. (只能)监控单个文件,当向文件中添加内容是,将原内容(原内容只上传一次)以及添加内容一并上传到HDFS中

(1) 架构

(2) 配置文件

# 来到conf-files目录下,新建文件夹exec-hdfs目录
cd $FLUME_HOME/conf-files
mkdir -p exec-hdfs# 于exec-hdfs目录下新建配置文件
vim exec-hdfs/exec-hdfs.conf# 配置内容为
#Namned
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# Source的类型
a1.sources.r1.type = exec
# 要执行的linux命令
a1.sources.r1.command = tail -F /opt/moudle/flume-1.9.0/files/test1.log# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a1.sinks.k1.type = hdfs
# 数据在hdfs存放的目录
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 每隔多少秒,将正在写入的文件向下滚动
a1.sinks.k1.hdfs.rollInterval = 1800
# 当前使用文件达到多少字节之后,向下滚动
a1.sinks.k1.hdfs.rollSize = 1024
# 当前使用文件中存放了多少个event后,向下滚动,禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 处理数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件的存储格式为压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置具体的压缩算法
a1.sinks.k1.hdfs.codeC = snappy
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出即可

注意:如果本机安装了hadoop,直接写路径就可以;如果本机没有安装hadoop,需要写一个可以连的上的nn(必须是active的)地址,例如:hdfs://hadoop101:8020hdfs-path。

(3) 启动

# 首先启动hadoop集群(本文中使用的是自定义启停脚本)
hadoop-ha.sh start# 启动agent
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/exec-hdfs/exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console# 向目标监控文件追加内容
echo "something" >> $FLUME_HOME/files/test1.log

3. 监控目录,当有新文件产生后,将新文件的内容采集到HDFS

(1) 架构

(2) 配置文件

# 来到conf-files目录下,新建spooldir-hdfs目录
cd $FLUME_HOME/conf-files
mkdir -p spooldir-hdfs# 于spooldir-hdfs目录下新建配置文件:spooldir-hdfs.conf
vim spooldir-hdfs/spooldir-hdfs.conf# 配置文件内容为
#Namned
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# Source的类型
a1.sources.r1.type = spooldir
# 要监控的目录
a1.sources.r1.spoolDir = /opt/moudle/flume-1.9.0/files
# 已经被采集过的文件的后缀名,根据后缀名判断文件是否被采集过,默认值即为:.COMPLETED
a1.sources.r1.fileSuffix = .COMPLETED
# 要监控目录下的哪些文件,支持使用正则表达式
a1.sources.r1.includePattern = .+\.log
# 哪些文件不被监控,支持使用正则表达式
a1.sources.r1.ignorePattern = .+\.txt# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a1.sinks.k1.type = hdfs
# 数据在hdfs存放的目录
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 每隔多少秒,将正在写入的文件向下滚动
a1.sinks.k1.hdfs.rollInterval = 1800
# 当前使用文件达到多少字节之后,向下滚动
a1.sinks.k1.hdfs.rollSize = 1024
# 当前使用文件中存放了多少个event后,向下滚动,禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 处理数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件的存储格式为压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置具体的压缩算法
a1.sinks.k1.hdfs.codeC = snappy
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存并退出

(3) 启动

# 事先准备好4个文件:test1.log、test2.log、test1.txt、test2.txt
# 启动hadoop,本文使用自定义的启停脚本
hadoop-ha.sh start# 将4个文件全部放入files文目录中(监控files目录)
mv test?.* files# 发现目录中文件名变为:test1.log.COMPLETED、test2.log.COMPLETED、test1.txt、test2.txt

4. 监控多个文件,当向文件末尾新增内容后,将原内容(原内容只上传一次)和新增内容采集到HDFS

(1) 架构

(2) 配置文件

# 来到conf-files目录下,新建目录:taildir-hdfs
cd $FLUME_HOME/conf-files
mkdir -p taildir-hdfs# 于taildir-hdfs目录下新建配置文件:taildir-hdfs.conf
vim taildir-hdfs.conf# 配置内容为
#Namned
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# Source的类型
a1.sources.r1.type = TAILDIR
# 将要监控的文件整合为文件组,可以监控多个文件组,该配置项用来命名
a1.sources.r1.filegroups = g1 g2
# 配置每个文件组内所含的文件,文件名支持使用正则表达式
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
a1.sources.r1.filegroups.g2 = /opt/moudle/flume-1.9.0/files/.+\.txt
# 支持断点续传,需要记录的信息都记录在该配置项的json文件当中
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a1.sinks.k1.type = hdfs
# 数据在hdfs存放的目录
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 每隔多少秒,将正在写入的文件向下滚动
a1.sinks.k1.hdfs.rollInterval = 1800
# 当前使用文件达到多少字节之后,向下滚动
a1.sinks.k1.hdfs.rollSize = 1024
# 当前使用文件中存放了多少个event后,向下滚动,禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 处理数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 设置文件的存储格式为压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream
# 设置具体的压缩算法
a1.sinks.k1.hdfs.codeC = snappy
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

(3) 启动

# 在files目录下准备4个文件:test1.log、test2.log、test1.txt、test2.txt
# g1组的文件集合为:test1.log、test2.log
# g2组的文件集合为:test1.txt、test2.txt# 启动hadoop集群,本文中使用自定义启停脚本
hadoop-ha.sh start# 向两个组内的文件追加内容
echo "test1.log file1" >> test1.log
echo "test1.txt file1" >> test1.txt

5. 启动两个Agent进程:a1、a2(如果在同一台机器上,需要为Agent起不同的名称;如果不在同一台机器上,名称可以相同);a1负责监控文件,读取文件的追加内容,传输文件内容到a2,a2将从a1读取的数据内容打印到客户端界面

(1) 架构

(2) 配置文件

# 来到conf-files目录下,新建目录:series-connection
cd $FLUME_HOME/conf-files
mkdir -p series-connection

新建a1.conf文件

# 于目录series-connection下创建配置文件:a1.conf
vim series-connection/a1.conf# 配置文件内容为:
#Namned
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# Source的类型
a1.sources.r1.type = TAILDIR
# 将要监控的文件整合为文件组,可以监控多个文件组,该配置项用来命名
a1.sources.r1.filegroups = g1
# 配置每个文件组内所含的文件,文件名支持使用正则表达式
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+
# 支持断点续传,需要记录的信息都记录在该配置项的json文件当中
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a1.sinks.k1.type = avro
# 要连接的IP
a1.sinks.k1.hostname = hadoop101
# 要连接的端口号
a1.sinks.k1.port = 6666
# 拉取数据的批次大小
a1.sinks.k1.batch-size = 1000# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

新建a2.conf文件

# 于目录series-connection下创建配置文件:a2.conf
vim series-connection/a2.conf# 配置文件内容为:
#Namned
a2.sources = r1
a2.channels = c1
a2.sinks = k1# Source
# Source的类型
a2.sources.r1.type = avro
# 要监听的IP
a2.sources.r1.bind = hadoop101
# 要监听的端口号
a2.sources.r1.port = 6666# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating# Channel
# Channel的类型
a2.channels.c1.type = memory
# Channel的最大容量为10000个event
a2.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a2.channels.c1.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# Sink的类型
a2.sinks.k1.type = logger# Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1# 保存退出

(3) 启动

# 首先启动a2,让其监听到端口
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/series-connection/a2.conf -n a2 -Dflume.root.logger=INFO,console# 启动a2,让其连接到a2监听的端口
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/series-connection/a1.conf -n a1 -Dflume.root.logger=INFO,console# 底层用的实际上是Java网络编程,必须先把服务端启动之后,客户端才连得上,否则报异常

6. 监控端口,将发往端口的数据传输到三个地方:本地文件中、hdfs中、打印到客户端界面中

(1) 架构

(2) 配置文件

# 来到conf-files目录下,新建目录:replicating
cd $FLUME_HOME/conf-files
mkdir -p replicating# 于replicating目录下新建配置文件:replicating.conf
vim replicating/replicating.conf# 配置文件内容为:
#Namned
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3# Source
# Source的类型
a1.sources.r1.type = netcat
# 监听的IP地址
a1.sources.r1.bind = hadoop101
# 监听的端口号
a1.sources.r1.port = 6666# Interceptors
# 没有拦截器# Channel Selector
# 选择器默认为:replicating
a1.sources.r1.selector.type = replicating# Channel
# c1
# Channel的类型
a1.channels.c1.type = memory
# Channel的最大容量为10000个event
a1.channels.c1.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c1.transactionCapacity = 1000# c2
# Channel的类型
a1.channels.c2.type = memory
# Channel的最大容量为10000个event
a1.channels.c2.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c2.transactionCapacity = 1000# c3
# Channel的类型
a1.channels.c3.type = memory
# Channel的最大容量为10000个event
a1.channels.c3.capacity = 10000
# Channel的事务容量设置为1000
a1.channels.c3.transactionCapacity = 1000# Channel Processor
# 走默认,一个Channel后只能接一个Sink# Sink
# k1
# Sink的类型
a1.sinks.k1.type = file_roll
# 本地文件保存的路径
a1.sinks.k1.sink.directory = /opt/moudle/flume-1.9.0/files/
# 每隔多少秒滚动文件
a1.sinks.k1.sink.rollInterval = 3600
# 拉取数据的批次大小
a1.sinks.k1.batch-size = 1000# k2
# Sink的类型
a1.sinks.k2.type = hdfs
# hdfs的存放数据文件的路径
a1.sinks.k2.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多长时间滚动一次文件
a1.sinks.k2.hdfs.rollInterval = 3600
# 文件达到多大滚动一次
a1.sinks.k2.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k2.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k2.hdfs.batchSize = 1000
# 存储的文件类型为普通文本文件
a1.sinks.k2.hdfs.fileType = DataStream
# 是否使用本地时间进行时间相关运算,设置为使用本地时间
a1.sinks.k2.hdfs.useLocalTimeStamp = true# k3
# Sink的类型
a1.sinks.k3.type = logger# Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3# 保存后退出

(3) 启动

# 启动命令为:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/replicating/replicating.conf -n a1 -Dflume.root.logger=INFO,console# 使用nc命令连接
nc hadoop101 6666# 输入消息,摁回车键即可

7. 监控端口,将采集到的网址发往不同的地方,要求含有内容为baidu的网址发往hdfs中,内容含有iqiyi的网址直接打印到客户端中,其它的数据丢弃

(1) 架构

(2) 构建

a. 编写拦截器

引入依赖为:

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency></dependencies>

编写代码为:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class WebsiteInterceptor implements Interceptor {/*** 初始化方法*/@Overridepublic void initialize() {}/*** 核心处理方法* @param event event对象* @return event对象*/@Overridepublic Event intercept(Event event) {// 获取headerMap<String, String> header = event.getHeaders();// 获取bodyString website = new String(event.getBody(), StandardCharsets.UTF_8);if (website.contains("baidu")){header.put("website", "baidu");}else if (website.contains("iqiyi")){header.put("website", "iqiyi");}else {event = null;}return event;}/*** 核心处理方法* @param list event list* @return event list*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> res = new ArrayList<>();for (Event event : list) {if (intercept(event) != null){res.add(event);}}return res;}/*** 释放资源的方法*/@Overridepublic void close() {}/*** 将来flume是通过反射出该类创建拦截器对象的*/public static class Builder implements Interceptor.Builder{/*** 该方法用来获取拦截器对象* @return 拦截器对象*/@Overridepublic Interceptor build() {return new WebsiteInterceptor();}@Overridepublic void configure(Context context) {/** 可以通过context对象读取拦截器的属性值:* a1.sources.r1.interceptors.i1.mypro = mypro* 可以通过:context.getString(mypro, defaultMyproValue)读取mypro的属性值*/}}
}

b. 配置文件

# 将上面编写的拦截器打包,放在flume的lib目录下即可# 来到conf-files目录下,创建新目录:multiplexing
cd $FLUME_HOME/conf-files
mkdir -p multiplexing# 于multiplexing目录下编写配置文件:multiplexing.conf
# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2# Source
# 对r1的配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 6666# Interceptors
# 拦截器链
a1.sources.r1.interceptors = i1
# 指定每个拦截器的类型(完整类名)
a1.sources.r1.interceptors.i1.type = com.fjg.interceptors.WebsiteInterceptor$Builder# Channel Selector
# 选择器的类型
a1.sources.r1.selector.type = multiplexing
# 选择器从event中提取的header属性,根据该属性决定event该发往哪个channel中
a1.sources.r1.selector.header = website
# 匹配属性值(属性值从header中取)和要发往的channel
a1.sources.r1.selector.mapping.baidu = c1
a1.sources.r1.selector.mapping.iqiyi = c2# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# 对c2的配置
a1.channels.c2.type = memory
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.capacity = 10000# Sink Processor# Sink
# 对k1的配置
# Sink的类型
a1.sinks.k1.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k1.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k1.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 对k2的配置
# Sink的类型
a1.sinks.k2.type = logger# Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2# 保存并退出

(3) 启动

# 启动命令为:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/multiplexing/multiplexing.conf -n a1 -Dflume.root.logger=INFO,console# 使用nc命令连接
nc hadoop101 6666# 发送内容含有baidu会存储到hdfs,含有iqiyi会直接打印到客户端窗口中,其他内容直接抛弃

8. 监控多个文件,将文件原内容(只上传一次原内容)以及追加内容上传至hdfs中,要求保证Sink具有较高的可靠性

(1) 架构

a.负载均衡

b. 故障转移

(2) 配置文件

a. 负载均衡

采用轮询方式:

# 来到conf-files目录下,新建目录:dependency
cd $FLUME_HOME/conf-files
mkdir -p dependency# 于dependency目录下新建配置文件:dependency.conf
vim dependency/dependency.conf# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 k3# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = TAILDIR
# 文件组有哪些,每一个文件组是要监控文件的集合
a1.sources.r1.filegroups = g1 g2
# g1组要监控的文件集合
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
a1.sources.r1.filegroups.g2 = /opt/moudle/flume-1.9.0/files/.+\.txt
# 断点续传,记录信息的文件
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
# Sink组有哪些
a1.sinkgroups = g1
# g1组包含的sink有哪些
a1.sinkgroups.g1.sinks = k1 k2 k3
# 采用负载均衡的策略向sinks发送数据
a1.sinkgroups.g1.processor.type = load_balance
# 启用退避机制:某些sink如果发生问题,会有一段时间不再考虑将数据发给该sink
a1.sinkgroups.g1.processor.backoff = true
# 轮询给每个sink发送数据
a1.sinkgroups.g1.processor.selector = round_robin# Sink
# 对k1的配置
# Sink的类型
a1.sinks.k1.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k1.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k1.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 对k2的配置
# Sink的类型
a1.sinks.k2.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k2.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k2.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k2.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k2.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k2.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k2.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k2.hdfs.useLocalTimeStamp = true# 对k3的配置
# Sink的类型
a1.sinks.k3.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k3.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k3.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k3.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k3.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k3.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k3.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k3.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1# 保存退出

采用随机方式:

# 来到conf-files目录下,新建目录:dependency
cd $FLUME_HOME/conf-files
mkdir -p dependency# 于dependency目录下新建配置文件:dependency.conf
vim dependency/dependency.conf# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 k3# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = TAILDIR
# 文件组有哪些,每一个文件组是要监控文件的集合
a1.sources.r1.filegroups = g1 g2
# g1组要监控的文件集合
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
a1.sources.r1.filegroups.g2 = /opt/moudle/flume-1.9.0/files/.+\.txt
# 断点续传,记录信息的文件
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
# Sink组有哪些
a1.sinkgroups = g1
# g1组包含的sink有哪些
a1.sinkgroups.g1.sinks = k1 k2 k3
# 采用负载均衡的策略向sinks发送数据
a1.sinkgroups.g1.processor.type = load_balance
# 启用退避机制:某些sink如果发生问题,会有一段时间不再考虑将数据发给该sink
a1.sinkgroups.g1.processor.backoff = true
# 轮询给每个sink发送数据
a1.sinkgroups.g1.processor.selector = random# Sink
# 对k1的配置
# Sink的类型
a1.sinks.k1.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k1.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k1.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 对k2的配置
# Sink的类型
a1.sinks.k2.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k2.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k2.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k2.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k2.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k2.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k2.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k2.hdfs.useLocalTimeStamp = true# 对k3的配置
# Sink的类型
a1.sinks.k3.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k3.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k3.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k3.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k3.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k3.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k3.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k3.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1# 保存退出

b. 故障转移

# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 k3# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = TAILDIR
# 文件组有哪些,每一个文件组是要监控文件的集合
a1.sources.r1.filegroups = g1 g2
# g1组要监控的文件集合
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
a1.sources.r1.filegroups.g2 = /opt/moudle/flume-1.9.0/files/.+\.txt
# 断点续传,记录信息的文件
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
# Sink组有哪些
a1.sinkgroups = g1
# g1组包含的sink有哪些
a1.sinkgroups.g1.sinks = k1 k2 k3
# 采用负载均衡的策略向sinks发送数据
a1.sinkgroups.g1.processor.type = failover
# 对组内的每个Sink设置优先级,值越大优先级越高,永远都是优先级最高的Sink提供服务
# 哪怕优先级最高的Sink挂掉了,其他Sink提供服务后,优先级最高的Sink又可用了,也会切换到优先级最高的Sink提供服务
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15# Sink
# 对k1的配置
# Sink的类型
a1.sinks.k1.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k1.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k1.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 对k2的配置
# Sink的类型
a1.sinks.k2.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k2.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k2.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k2.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k2.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k2.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k2.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k2.hdfs.useLocalTimeStamp = true# 对k3的配置
# Sink的类型
a1.sinks.k3.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k3.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k3.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k3.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k3.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k3.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k3.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k3.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1

(3) 启动

# 准备好需要监控的文件
# 启动命令为:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/dependency/dependency.conf -n a1 -Dflume.root.logger=INFO,console# 向文件中追加内容

9. 聚合:存在两台服务器,这两台服务器会不断的把数据落盘成文件,现需要从两台服务器上采集这些落盘数据,将这些采集到的数据通过另外一台服务器聚合传输到hdfs上

(1) 架构

(2) 配置文件

# 来到hadoop101、hadoop102、hadoop103的conf-files目录下,新建目录:aggregation
cd $FLUME_HOME/conf-files
mkdir -p aggregation# 于aggregation目录下新建配置文件:aggregation.conf
touch aggregation/aggregation.conf

a. hadoop101的配置文件

vim aggregation/aggregation.conf# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = TAILDIR
# 文件组有哪些,每一个文件组是要监控文件的集合
a1.sources.r1.filegroups = g1
# g1组要监控的文件集合
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
# 断点续传,记录信息的文件
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
...# Sink
# 对k1的配置
a1.sinks.k1.type = avro
# 要连接的IP
a1.sinks.k1.hostname = hadoop103
# 要连接的端口号
a1.sinks.k1.port = 6666
# 拉取数据大小
a1.sinks.k1.batch-size = 1000# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

b. hadoop102的配置文件

vim aggregation/aggregation.conf# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = TAILDIR
# 文件组有哪些,每一个文件组是要监控文件的集合
a1.sources.r1.filegroups = g1
# g1组要监控的文件集合
a1.sources.r1.filegroups.g1 = /opt/moudle/flume-1.9.0/files/.+\.log
# 断点续传,记录信息的文件
a1.sources.r1.positionFile = /opt/moudle/flume-1.9.0/.flume/taildir_position.json# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
...# Sink
# 对k1的配置
a1.sinks.k1.type = avro
# 要连接的IP
a1.sinks.k1.hostname = hadoop103
# 要连接的端口号
a1.sinks.k1.port = 6666
# 拉取数据大小
a1.sinks.k1.batch-size = 1000# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

c. hadoop103的配置文件

vim aggregation/aggregation.conf# 配置文件内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = avro
# 监听的IP地址
a1.sources.r1.bind = hadoop103
# 监听的端口号
a1.sources.r1.port = 6666# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
...# Sink
# 对k1的配置
# Sink的类型
a1.sinks.k1.type = hdfs
# 文件存在hdfs的路径
a1.sinks.k1.hdfs.path = /flume/date=%Y-%m-%d/hour=%H
# 经过多少秒滚动一次文件
a1.sinks.k1.hdfs.rollInterval = 1800
# 文件达到多少字节滚动一次
a1.sinks.k1.hdfs.rollSize = 1024
# 处理多少个event之后滚动一次文件,设置为0,表示禁用该选项
a1.sinks.k1.hdfs.rollCount = 0
# 拉取数据的批次大小
a1.sinks.k1.hdfs.batchSize = 1000
# 文件的存储方式为普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# 使用本地的时间戳进行时间的相关运算
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

(3) 启动

# 在hadoop101和hadoop102上准备好要监控的文件# 首先于hadoop103上开启扮演服务端角色的agent
# 然后分别与hadoop101、hadoop102上开启扮演客户端角色的agent
# 启动命令都是一样的,为:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/aggregation/aggregation.conf -n a1 -Dflume.root.logger=INFO,console# 向准备好的监控文件添加内容即可

六、Flume组件自定义

自定义组件需要引入依赖,引入依赖为:

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency></dependencies>

1. 自定义Source

(1) Source代码为:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.JSONEvent;
import org.apache.flume.source.AbstractSource;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;public class PrefixSource extends AbstractSource implements PollableSource, Configurable {private String prefix = "";/*** 核心处理方法,在agent进程启动期间,会被循环调用,用来获取数据* @return 成功获取数据并将其传递给Channel Processor后,返回准备成功的状态;反之返回退避的状态。* @throws EventDeliveryException*/@Overridepublic Status process() throws EventDeliveryException {Status status = null;try {// 获取封装好数据的event对象Event e = getSomeData();// 将event对象交给Channel Processor进行处理getChannelProcessor().processEvent(e);status = Status.READY;} catch (Throwable t) {// 获取封装数据的event对象失败status = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;}/*** 该方法用于产生数据,并将产生的数据封装成event对象返回* @return event对象*/public Event getSomeData(){// 创建一个event对象// 还有一个实现了Event接口的实现类叫做:SimpleEvent// 两者的不同在于:JSONEvent自动使用utf-8编码方式,而SimpleEvent会自动使用平台采用的编码方式Event event = new JSONEvent();// 封装headersMap<String, String> headers = new HashMap<>();headers.put("sourceType", "userDefineSource");event.setHeaders(headers);// 随机产生出数据,并将其封装在body中event.setBody((prefix + UUID.randomUUID().toString()).getBytes(StandardCharsets.UTF_8));// 返回eventreturn event;}/*** 退避时间的增长步长* @return*/@Overridepublic long getBackOffSleepIncrement() {return 1;}/*** 最长退避时间* @return*/@Overridepublic long getMaxBackOffSleepInterval() {return 100;}/*** 读取配置,该方法最先被执行且只执行一次* @param context 上下文对象*/@Overridepublic void configure(Context context) {// 读取prefix配置// 相当于读取了:a1.sources.r1.prefix = xxxprefix = context.getString("prefix", "none");}
}

(2) 将自定义的Source打包,然后放入Flume的lib目录下

(3) 配置文件内容为

# 来到conf-files目录下,新建目录为prefixsource
cd $FLUME_HOME/conf-files
mkdir -p prefixsource# 于prefixsource目录下创建配置文件:prefixsource.conf
vim prefixsource/prefixsource.conf# 配置内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# 对r1的配置
a1.sources.r1.type = com.fjg.source.PrefixSource
a1.sources.r1.prefix = hiahia# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
...# Sink
# 对k1的配置
a1.sinks.k1.type = logger# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

(4) 启动

agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/prefixsource/prefixsource.conf -n a1 -Dflume.root.logger=INFO,console

2. 自定义Interceptor

请参考第七个案例

3. 自定义Sink

(1) Sink代码为

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** 该类会处理从Channel拉取到的event对象,首先读取配置文件中对Sink的配置值(a1.sinks.k1.prefix = xxx),* 然后将读取到的值连接到body中内容的前面,输出到日志框架中*/
public class PrefixSink extends AbstractSink implements Configurable {// 读取到的配置值,即为前缀private String prefix = "";// 打印日志的对象private static final Logger logger = LoggerFactory.getLogger(PrefixSink.class);/*** 核心处理方法,会被循环调用* @return 状态对象,如果该方法正确执行,会返回标识正确执行的状态;否则返回标识执行失败的状态对象* @throws EventDeliveryException*/@Overridepublic Status process() throws EventDeliveryException {Status status = null;// 开启从Channel中拉取数据的事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try {// 获取event对象Event event = ch.take();/**   将body中的数据拼接上prefix,输出到日志框架中*/logger.info(prefix + new String(event.getBody(), StandardCharsets.UTF_8));// 提交事务txn.commit();status = Status.READY;} catch (Throwable t) {// 执行方法时出现异常,回滚事务txn.rollback();// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}finally {// 每一次事务之后一定要记得关闭事务,否则会抛出异常txn.close();}return status;}/*** 读取配置项的方法,最先运行且只运行一次* @param context*/@Overridepublic void configure(Context context) {// 读取prefix的配置值,如果读取失败,使用默认值prefix = context.getString("prefix", "defaultPrefix");}
}

(2) 将自定义的Sink打包,放在Flume的lib目录下

(3) 配置文件内容为:

# 来到conf-files目录下,新建目录为prefixsink
cd $FLUME_HOME/conf-files
mkdir -p prefixsink# 于prefixsink目录下创建配置文件:prefixsink.conf
vim prefixsink/prefixsink.conf# 配置内容为:
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Source
# 对r1的配置
# Source的类型
a1.sources.r1.type = netcat
# 要监听的IP
a1.sources.r1.bind = hadoop101
# 要监听的端口号
a1.sources.r1.port = 6666# Interceptors
...# Channel Selector
...# Channel
# 对c1的配置
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 10000# Sink Processor
...# Sink
# 对k1的配置
a1.sinks.k1.type = com.fjg.sink.PrefixSink
a1.sinks.k1.prefix = my_sink-# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 保存退出

(4) 启动

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/prefixsink/prefixsink.conf -n a1 -Dflume.root.logger=INFO,console

2022-08-03 Flume相关推荐

  1. 2022.08.03 第三组 高小涵

    锁: 1.synchronized(重量级锁)多线程并发编程.          JDK1.6为了减少获得锁和释放锁带来的性能消耗引入的偏向锁和轻量级锁. 拓展:  1.无锁:不加锁          ...

  2. 【2022.08.03 星期三】考研日记:英语单词

    dissipate[消散,烟消云散,浪费,挥霍] empirical[经验主义的] tobacco[烟草] taboo[禁忌,忌讳] tattoo[纹身] salient[突出的,凸出部分] depr ...

  3. 数学建模学习视频及资料集(2022.08.10)

    2021高中数学建模(应用)能力展示活动细则详解(上) 2021高中数学建模(应用)能力展示活动细则详解(上)_哔哩哔哩_bilibili 2021高中数学建模(应用)能力展示活动细则详解(下) 20 ...

  4. H5在线CAD,网页CAD,MxDraw云图平台2022.08.24更新

    SDK开发包下载地址: MxDraw云图平台 2022.08.24更新_梦想CAD控件 1. 增加对像扩展数据功能 2. 增加CAD结合GIS使用功能 https://www.mxdraw3d.com ...

  5. CSP-J1 CSP-S1 初赛 第1轮(2022.08.09)

    2022年CSP-J1 CSP-S1 第1轮初赛 报名指南 ​​​​​​2022年CSP-J1 CSP-S1 第1轮初赛 报名指南_dllglvzhenfeng的博客-CSDN博客 [教程]图文详解 ...

  6. 格式化时间 将2021-09-05T09:08:03.000Z 转换成 YYYY-MM-DD HH:mm:ss 格式

    格式化时间 将2021-09-05T09:08:03.000Z 转换成 YYYY-MM-DD HH:mm:ss 格式 1.引入第三方包 dayjs (一个轻量的处理时间和日期的javascript库) ...

  7. 八股文-- 2022.08.31

    途虎养车2022.08.31 在Java中,LinkedList类有而ArrayList类没有的方法是:removeLast()方法 LinkedList :底层基于双向链表实现,不支持高效的随机元素 ...

  8. Dapr中国社区活动之 分布式运行时开发者日 (2022.09.03)

    自2019年10月首次发布以来,Dapr(Distributed Application Runtime,分布式应用运行时)因其"更稳定"."更可靠".&quo ...

  9. 【记录ROS学习(五)2022/08/27】Win版ROS/Noetic 如何添加必要的依赖(rosdep)

    [记录ROS学习(五)2022/08/27]Win版ROS/Noetic 如何添加必要的依赖(rosdep) 近期为了使得更多的设备可以连入ROS,开始转向Windows平台的ROS,遇到一些问题 无 ...

  10. 2022年03月-电子学会青少年等级考试C语言(一级)真题与解析

    微信扫码关注公众号获取更多资讯 2022年03月软件编程(C语言)等级考试(一级) 分数:100   题数:5 时间限制:1000 ms   内存限制:65536 kB 1.双精度浮点数的输入输出 [ ...

最新文章

  1. 关于解决多台linux服务器间的文件实时同步问题
  2. unity 给图片边缘_Unity Shader 屏幕后效果——边缘检测
  3. kettle中使用javascript步骤和fireToDB函数实现自己定义数据库查询
  4. 一文看懂:搭建活动分析体系
  5. 数据库迁移_数据库迁移了解一下
  6. Confluence Cloud的Teams Message Extension
  7. Java福尔摩斯的约会大侦探福尔摩斯接到一张奇怪的字条:“我们约会吧 3485djDkxh4hhGE 2984akDfkkkkggEdsb shgsfdk dHyscvnm”。大侦探很快就明白了
  8. 1.7 编程基础之字符串 27 单词翻转 4分 python
  9. Generator 简介
  10. [obc学习日记]3.10
  11. 素材诊断分析助手_资深优化师告诉你广告投放素材都在哪找?(国内篇)
  12. Android 的安全性岌岌可危!
  13. 给笔记本更换SSD硬盘
  14. c++访问私有(private)成员变量的常用方法
  15. php web browser,如何在PHP Scriptable Web Browser中調用javascript函數
  16. URLRewrite 在 iis6+iis7中的配置
  17. vsCode实现美化代码
  18. Word导出PDF后,PDF没有生成Word中对应的目录
  19. 深入挖掘AJAX(来源:http://superbo8888.javaeye.com)
  20. 全球及中国散热产业产销规模及投资盈利预测报告2021版

热门文章

  1. .net简单web开发
  2. Tomcat 部署方式
  3. html page 制作,webpageHTML
  4. Windows下自动连接WiFi 脚本
  5. 【Linux】Linux安装nginx操作详细步骤
  6. 数值计算笔记之插值(四)三次样条插值
  7. partial overwrite
  8. go语言gin项目创建
  9. 使用docker搭建DVWA
  10. ws2812驱动总结(包括对时序的详细分析,代码基于STC15系列单片机)