title: Flume系列


第一章 Flume基础理论

1.1 数据收集工具产生背景

Hadoop 业务的一般整体开发流程:

任何完整的大数据平台,一般都会包括以下的基本处理过程:

数据采集
数据 ETL
数据存储
数据计算/分析
数据展现

其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出。这其中包括:

数据源多种多样
数据量大,变化快
如何保证数据采集的可靠性的性能
如何避免重复数据
如何保证数据的质量

我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠, 高性能和高扩展。

总结:
数据的来源大体上包括:

1、业务数据
2、爬取的网络公开数据
3、购买数据
4、自行采集日志数据

1.1 Flume简介

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

​ Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方,用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。

1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用来采集流动型数据。

2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集,它支持从很多数据源聚合数据到 HDFS。

3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。

4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、可靠性。

1.2 Flume版本

Flume 在 0.9.x and 1.x 之间有较大的架构调整:
1.x 版本之后的改称 Flume NG
0.9.x 版本称为 Flume OG,最后一个版本是 0.94,之后是由 Apache 进行了重构
N是New 和 O是Old

Flume1.7版本要求:

Flume OG  Old/Original Generation
Flume NG  New/Next     Generation

注意,上面是flume1.7的要求,其他版本要求可能会不一样!!

本文使用版本链接:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

官网链接:http://flume.apache.org/

Flume1.9 版本要求:

System Requirements

Java Runtime Environment - Java 1.8 or later
Memory - Sufficient memory for configurations used by sources, channels or sinks
Disk Space - Sufficient disk space for configurations used by channels or sinks
Directory Permissions - Read/Write permissions for directories used by agent

第二章 Flume体系结构/核心组件

agent:能独立执行一个数据收集任务的JVM进程
source : agent中的一个用来跟数据源对接的服务
channel : agent内部的一个中转组件
sink : agent中的一个用来跟数据目的地对接的服务
event: 消息流转的一个载体/对象header  body常见source的类型Avro source :接收网络端口中的数据exec source: 监听文件新增内容   tail -fspooldir source :监控文件夹的,如果这个文件夹里面的文件发送了变化,就可以采集Taildir source: 多目录多文件实时监控常见的channel的类型memory : 内存中  , 快 , 但不安全file : 相对来说安全些,但是效率低些jdbc: 使用数据库进行数据的保存常见的sink的类型logger   做测试使用HDFS    离线数据的sink 一般Kafka   流式数据的sink
以上仅仅是常见的一些,官网中有完整的。

2.1 介绍

​ Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

​ Flume 以 agent 为最小的独立运行单位

​ 一个 agent 就是一个 JVM

​ 单 agent 由 Source、Sink 和 Channel 三大组件构成。

如下面官网图片

解释:

2.2 Flume三大核心组件

Event
Event 是 Flume 数据传输的基本单元。
Flume 以事件的形式将数据从源头传送到最终的目的地。
Event 由可选的 header 和载有数据的一个 byte array 构成。
载有的数据对 flume 是不透明的。
Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。
Header 可以在上下文路由中使用扩展。

Client
Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体
目的是从数据源系统中解耦 Flume
在 Flume 的拓扑结构中不是必须的

Agent
一个 Agent 包含 source,channel,sink 和其他组件。
它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。
Agent 是 flume 流的基础部分。
Flume为这些组件提供了配置,声明周期管理,监控支持。

Agent 之 Source
Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个
包含 event 驱动和轮询两种类型
不同类型的 Source
与系统集成的 Source:Syslog,Netcat,监测目录池
自动生成事件的 Source:Exec
用于 Agent 和 Agent 之间通信的 IPC source:avro,thrift
Source 必须至少和一个 channel 关联

Agent 之 Channel
Channel 位于 Source 和 Sink 之间,用于缓存进来的 event
当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除
不同的 channel 提供的持久化水平也是不一样的
Memory Channel:volatile(不稳定的)
File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现
JDBC Channel:基于嵌入式 database 实现
Channel 支持事务,提供较弱的顺序保证
可以和任何数量的 source 和 sink 工作

Agent 之 Sink
Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除
不同类型的 sink ,比如 HDFS,HBase

2.3 Flume经典部署方案

1、单Agent采集数据

​ 由一个 agent 负责把从 web server 中收集数据到 HDFS 。

2、多Agent串联

​ 在收集数据的过程中,可以让多个 agent 串联起来,形成一条 event 数据线,进行传输,但 是注意的是:相邻两个 agent 的前一个 agent 的 sink 类型要和后一个 agent 的 source 类型一 致。

3、多Agent合并串联

​ 多个 agent 串联,并联成一个复杂的 数据收集架构。反映了 flume 的部署灵活。并且针对关键节点,还可以进行高可用配置。

4、多路复用

​ 一份数据流,可以被复制成多份数据流,交给多个不同组件进行处理。一般用于一边永久存储一边进行计算。

第三章 Flume安装及案例

3.1 安装部署

3.1.1 Flume1.7安装部署

1、将apache-flume-1.7.0-bin.tar.gz上传到hadoop0的/software目录下,并解压

[root@hadoop0 software]# tar -zxvf apache-flume-1.7.0-bin.tar.gz

2、重命名为flume

[root@hadoop0 software]# mv apache-flume-1.7.0-bin flume

3、修改flume-env.sh文件

[root@hadoop0 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk

3.1.2 Flume1.9安装部署

1、将apache-flume-1.9.0-bin.tar.gz上传到hadoop10的/software目录下,并解压

[root@hadoop10 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz

2、重命名为flume

[root@hadoop10 software]# mv apache-flume-1.9.0-bin flume

3、修改flume-env.sh文件

[root@hadoop10 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk

4、看看Flume版本

[root@hadoop10 bin]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@hadoop10 bin]# pwd
/software/flume/bin
[root@hadoop10 bin]#

3.2 案例

3.2.1 监控端口数据(官方案例)

1、在flume的目录下面创建文件夹
[root@hadoop0 flume]# mkdir job
[root@hadoop0 flume]# cd job
2、定义配置文件telnet-logger.conf
[root@hadoop0 job]# vim telnet-logger.conf
添加内容如下:# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c13、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console4、执行telnet localhost 44444
telnet localhost 44444
会先报找不到telnet
[root@hadoop10 flume]# telnet localhost 44444
bash: telnet: command not found...
[root@hadoop10 flume]#
然后执行yum -y install telnet
5、发送命令测试即可

针对于上述配置telnet-logger.conf文件的内容的解释:

# example.conf: A single-node Flume configuration# Name the components on this agent  #a1: 表示的是agent的名字
a1.sources = r1        #r1 : 表示的是a1的输入源
a1.sinks = k1          #k1 : 表示的a1的输出目的地
a1.channels = c1   #c1 : 表示的a1的缓冲区# Describe/configure the source   #配置source
a1.sources.r1.type = netcat        #表示a1的输入源r1的类型是netcat类型
a1.sources.r1.bind = localhost  #表示a1监听的主机
a1.sources.r1.port = 44444      #表示a1监听的端口号# Describe the sink         #描述sink
a1.sinks.k1.type = logger  #表示a1的输入目的地k1的类型是logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory       #表示a1的channel的类型是memory类型
a1.channels.c1.capacity = 1000     #表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100  #表示a1的channel传输的时候收集到了100个event以后再去提交事务# Bind the source and sink to the channel
a1.sources.r1.channels = c1  #表示将r1和c1 连接起来
a1.sinks.k1.channel = c1     #表示将k1和c1 连接起来3、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,consolebin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger2.conf -Dflume.root.logger=INFO,console参数说明:--conf conf : 表示配置文件在conf目录--name a1  :  表示给agent起名为a1 --conf-file job/telnet-logger.conf : flume本次启动所要读取的配置文件在job文件夹下面的telnet-logger.conf文件-Dflume.root.logger=INFO,console : -D 表示flume运行时候的动态修改flume.root.logger参数值,并将日志打印到控制台,级别是INFO级别。日志级别: log、info、warn、error

3.2.2 监控目录中的文件到HDFS

1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c32、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

针对于上述配置dir-hdfs.conf文件的内容的解释:

1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:a3.sources = r3     #定义source为r3
a3.sinks = k3       #定义sink为k3
a3.channels = c3    #定义channel为c3# Describe/configure the source  #配置source相关的信息
a3.sources.r3.type = spooldir    #定义source的类型是spooldir类型
a3.sources.r3.spoolDir = /software/flume/upload   #定义监控的具体的目录
a3.sources.r3.fileSuffix = .COMPLETED            #文件上传完了之后的后缀
a3.sources.r3.fileHeader = true                      #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)        #忽略以tmp结尾的文件,不进行上传# Describe the sink         #配置sink相关的信息
a3.sinks.k3.type = hdfs        #定义sink的类型是hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H    #文件上传到hdfs的具体的目录
a3.sinks.k3.hdfs.filePrefix = upload-      #文件上传到hdfs之后的前缀
a3.sinks.k3.hdfs.round = true              #是否按照时间滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1                #多长时间单位创建一个新的文件
a3.sinks.k3.hdfs.roundUnit = hour          #时间单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true   #是否使用本地时间
a3.sinks.k3.hdfs.batchSize = 100           #积累多少个event才刷写到hdfs一次
a3.sinks.k3.hdfs.fileType = DataStream     #文件类型
a3.sinks.k3.hdfs.rollInterval = 600            #多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700      #多大生成新文件
a3.sinks.k3.hdfs.rollCount = 0             #多少event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1      #副本数# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c32、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

在执行上面的命令过程中遇到的了一点点小问题

......
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)Vat org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)at java.lang.Thread.run(Thread.java:748)

解决方案:将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop的版本。可以通过重命名的方式注释掉即可(实现删除的效果)。

[root@hadoop10 lib]# mv guava-11.0.2.jar guava-11.0.2.jar.backup

3.2.3 监控文件到HDFS

1、创建一个自动化文件
[root@hadoop0 job]# vim mydateauto.sh
写入:
#!/bin/bashwhile true
doecho `date`sleep 1
done然后运行测试:
[root@hadoop0 job]# sh mydateauto.sh
Wed Aug 19 18:34:19 CST 2020
Wed Aug 19 18:34:20 CST 2020然后修改配置,将输出的日志追加到某个文件中
#!/bin/bashwhile true
doecho `date` >> /software/flume/mydate.txtsleep 1
done再次执行[root@hadoop0 job]# sh mydateauto.sh
就会在flume的文件夹下面生成了mydate.txt文件
通过tail -f mydate.txt 查看
再次执行sh mydateauto.sh  查看输出。2、创建配置vim file-hdfs.conf# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /software/flume/mydate.txt
a2.sources.r2.shell = /bin/bash -c# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c23、启动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

针对于上述配置file-hdfs.conf文件的内容的解释:

# Name the components on this agent
a2.sources = r2        #定义source为r2
a2.sinks = k2      #定义sink为k2
a2.channels = c2   #定义channel为c2# Describe/configure the source
a2.sources.r2.type = exec  #定义source的类型是exec 可执行命令
a2.sources.r2.command = tail -F /software/flume/mydate.txt #具体文件位置
a2.sources.r2.shell = /bin/bash -c  #命令开头# Describe the sink   #sink相关配置
a2.sinks.k2.type = hdfs        #定义sink的类型是hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H       #具体的位置
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600   #单位是秒!!
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c23、启动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

过程中遇到的一点点小问题:

18 Oct 2021 14:32:24,340 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k2, type: hdfs
18 Oct 2021 14:32:24,348 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:469)  - Sink k2 has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403)at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462)at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

解决方案:

问题原因:原因其实很明了了,就是字面的意思,channel 与 sink的设置不匹配,sink的batch size大于channel的transaction capacity
解决方案:将a2.sinks.k2.hdfs.batchSize设置为小于等于100 。 或者注释掉也可以。

3.2.4 多目录多文件实时监控(Taildir Source)

与前面使用到的Source的对比

Spooldir Source 用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。
Exec source 用于监控一个实时追加的文件,不能实现断点续传;
Taildir Source 用于监听多个实时追加的文件,并且能够实现断点续传。

操作案例:

1、在job下面创建 vim taildir-hdfs.confa3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /software/flume/taildir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /software/flume/taildirtest/filedir/.*file.*
a3.sources.r3.filegroups.f2 = /software/flume/taildirtest/logdir/.*log.*# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/uploadtaildir/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c32、创建文件文件夹,注意需要在启动之前创建监控的文件夹
[root@hadoop10 flume]# mkdir taildirtest
[root@hadoop10 flume]# cd taildirtest/
[root@hadoop10 taildirtest]# ll
total 0
[root@hadoop10 taildirtest]# mkdir filedir
[root@hadoop10 taildirtest]# mkdir logdir
[root@hadoop10 taildirtest]# ll
total 0
drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir
drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir
[root@hadoop10 taildirtest]# vim file.txt
[root@hadoop10 taildirtest]# vim log.txt
[root@hadoop10 taildirtest]# ll
total 8
drwxr-xr-x. 2 root root  6 Oct 18 16:44 filedir
-rw-r--r--. 1 root root 35 Oct 18 16:45 file.txt
drwxr-xr-x. 2 root root  6 Oct 18 16:45 logdir
-rw-r--r--. 1 root root 35 Oct 18 16:46 log.txt3、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-hdfs.conf4、测试
[root@hadoop10 taildirtest]# cp file.txt filedir/
[root@hadoop10 taildirtest]# cp log.txt logdir/
[root@hadoop10 taildirtest]# cd filedir/
[root@hadoop10 filedir]# echo hello1 >> file.txt
[root@hadoop10 filedir]# cd ../logdir/
[root@hadoop10 logdir]# echo hello2 >> log.txt
[root@hadoop10 logdir]# 

声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。

By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

浅析数据采集工具Flume相关推荐

  1. 数据采集工具 -- Flume

    1.Flume的定义 Flume由Cloudera公司开发,是一个分布式.高可靠.高可用的海量日志采集.聚合.传输的系统. Flume支持在日志系统中定制各类数据发送方,用于采集数据: Flume提供 ...

  2. 大数据采集工具Flume

    Flume 一.概述 1.基础架构 2.拓扑结构 3.Agent内部原理 二.安装Flume 三.入门案例 1.监控端口 2.实时监控单个追加文件 3.读取目录新文件到HDFS 4.实时监控多个追加文 ...

  3. 数据采集工具flume

    概述 flume是在2011年被首次引入到Cloudera的CDH3分发中,2011年6月,Cloudera将flume项目捐献给Apache基金会.2012年,flume项目从孵化器变成了顶级项目, ...

  4. 数据采集工具之Flume的常用采集方式详细使用示例

    数据采集工具之Flume的常用采集方式详细使用示例 Flume Flume概述 Flume架构 核心的组件 常用Channel.Sink.Source类型 Flume架构模式 安装Flume Flum ...

  5. 数据采集:Flume和Logstash的工作原理和应用场景

    在某个Logstash的场景下,我产生了为什么不能用Flume代替Logstash的疑问,因此查阅了不少材料在这里总结,大部分都是前人的工作经验下,加了一些我自己的思考在里面,希望对大家有帮助. 大数 ...

  6. 大数据数据采集工具简介

    随着大数据技术体系的发展,越来越多的企业应用大数据技术支撑自己的业务发展.数据采集作为大数据的起点,是企业主动获取数据的一种重要手段.数据采集的多样性.全面性直接影响数据质量. 企业获取数据的渠道分为 ...

  7. 猿创征文|大数据开发必备的数据采集工具汇总

    文章目录 前言 1.Flume 适用场景 工作方式 2.Flink CDC 适用场景 工作方式 3.Sqoop 适用场景 工作方式 4.Canal 适用场景 工作方式 5.Kettle 适用场景 工作 ...

  8. 大数据获取方法;数据采集工具;常用ETL工具简介

    1.采用哪些方式可以获得大数据? 方式1.外部购买数据 有很多公司或者平台是专门做数据收集和分析的,企业会直接从那里购买数据或者相关服务给数据分析师,这是一种常见的获取数据的方式之一. 方式2.网络爬 ...

  9. 用python读取dat文件,wifi数据采集工具csi tool数据文件(.dat文件)解析

    项目地址: https://github.com/hongshixian/CSI_reader 前言 数据采集工具csi_tool采集数据并保存为后缀.dat的数据文件,在csi_tool中提供一个c ...

最新文章

  1. Python调整图片大小并保存调整后的图像
  2. opencv图像恢复逆滤波_OpenCV之快速的图像边缘滤波算法
  3. matlab作图标记
  4. rest api_REST API的演变
  5. windows服务器下com6僵尸***删除
  6. iphone7防水_iPhone11系列防水测试,其结果令人意外
  7. 五道口服装市场关闭前 职业“甩货人”赚一笔
  8. vue中遇到的问题:Error: Cannot find module 'chalk'
  9. webform(八)组合查询
  10. 注意!黑客可以通过CSS3功能攻击浏览器
  11. scrapy数据清洗:
  12. 使用最广泛的缓存Redis,升级到6.0后超神了
  13. spss软件测试题题库,spss期末考试试题及答案
  14. 11种免费获取SSL证书的方式
  15. ftp服务器上的文件夹是否存在,检查FTP服务器上是否存在文件
  16. 【速成MSP430f149】电赛期间学习MSP430f149笔记
  17. 工行u盾显示316_工行U盾常见故障处理
  18. IceSword(冰刃)V1.22 Final 绿色汉化修正版
  19. 如何做好虾皮跨境电商?关于Shopee店铺快速开单的真相!
  20. 深圳x医院ICU危重症信息系统MongoDB误删-恢复记录-2020.10.17

热门文章

  1. Redis 集群原理,再也不怕面试被问倒
  2. 有序顺序表中插入一个元素,使其仍然有序
  3. 跨境进出口电商供应链系统模式以及痛点
  4. linux lun分区,Linux服务器新增LUN而不需重启的实现
  5. uniapp微信小程序连接蓝牙打印机 打印文字、图片
  6. Win8风格界面效果
  7. Linux 用户行为日志审计 日志监控
  8. [洛谷]P4158 [SCOI2009]粉刷匠 (#线性dp+背包dp)
  9. C#高级--加密解密详解
  10. Happy WiFi,Happy life