Flume 入门--几种不同的Sinks
主要介绍几种常见Flume的Sink--汇聚点
1.Logger Sink
记录INFO级别的日志,一般用于调试。前面介绍Source时候用到的Sink都是这个类型的Sink
必须配置的属性:
属性说明:
!channel –
!type – The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log
要求必须在 --conf 参数指定的目录下有 log4j的配置文件
可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数
案例:前面的例子都是这种类型的Sink
2.File Roll Sink
在本地文件系统中存储事件。每隔指定时长生成文件保存这段时间内收集到的日志信息。
属性说明:
!channel –
!type – 类型,必须是"file_roll"
!sink.directory – 文件被存储的目录
sink.rollInterval 30 滚动文件每隔30秒(应该是每隔30秒钟单独切割数据到一个文件的意思)。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件。
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize 100
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
编写配置文件:
#命名Agent a1的组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#描述/配置Source
a1.sources.r1. type = http
a1.sources.r1.port = 6666
#描述Sink
a1.sinks.k1. type = file_roll
a1.sinks.k1.sink.directory = /home/park/work/apache-flume-1 .6.0-bin /mysink
#描述内存Channel
a1.channels.c1. type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#为Channle绑定Source和Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
启动flume:
1
|
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template7 .conf --name a1 -Dflume.root.logger=INFO,console
|
测试:
通过curl命令向目标主机发送请求,就会发现在指定的文件夹下出现记录收集日志的文件
3.Avro Sink
是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。非常重要 但是需要多台机器
必要属性说明:
!channel –
!type – The component type name, needs to be avro.
!hostname – The hostname or IP address to bind to.
!port – The port # to listen on.
案例1.多级流动 h1流动到h2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
h2:
配置配置文件:
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988
#描述Sink
a1.sinks.k1. type =logger
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template8 .conf --name a1 -Dflume.root.logger=INFO,console
h1:
配置配置文件
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =http
a1.sources.r1.port=8888
#描述Sink
a1.sinks.k1. type =avro
a1.sinks.k1. hostname =192.168.242.138
a1.sinks.k1.port=9988
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.chafile: ///C : /Users/park/Desktop/Day01_Flume/ %E6%96%87%E6%A1%A3 /Flume %201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm #irc-sinknnels=c1
a1.sinks.k1.channel=c1
|
启动flume
发送http请求到h1:
1
|
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http: //192 .168.242.133:8888
|
稍等几秒后,发现h2最终收到了这条消息
案例2:扇出流(h1扇出到h2,h3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
h2 h3:
配置配置文件:
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988
#描述Sink
a1.sinks.k1. type =logger
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template8 .conf --name a1 -Dflume.root.logger=INFO,console
h1:
配置配置文件
#命名Agent组件
a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2
#描述/配置Source
a1.sources.r1. type =http
a1.sources.r1.port=8888
#描述Sink
a1.sinks.k1. type =avro
a1.sinks.k1. hostname =192.168.242.138
a1.sinks.k1.port=9988
a1.sinks.k2. type =avro
a1.sinks.k2. hostname =192.168.242.135
a1.sinks.k2.port=9988
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.channels.c2. type =memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
|
案例3:扇入流()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
m3:
编写配置文件:
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
#描述Sink
a1.sinks.k1. type =logger
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template .conf --name a1 -Dflume.root.logger=INFO,console
m1、m2:
编写配置文件:
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =http
a1.sources.r1.port=8888
#描述Sink
a1.sinks.k1. type =avro
a1.sinks.k1. hostname =192.168.242.135
a1.sinks.k1.port=4141
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template9 .conf --name a1 -Dflume.root.logger=INFO,console
m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
[root@localhost conf] # curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
[root@localhost conf] # curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
发现m3均能正确收到消息
|
4、HDFS Sink
此Sink将事件写入到Hadoop分布式文件系统HDFS中。
目前它支持创建文本文件和序列化文件。对这两种格式都支持压缩。 这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。
它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作
HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。
使用这个Sink要求hadoop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。
注意,此版本hadoop必须支持sync()调用。
必要属性说明:
!channel –
!type – 类型名称,必须是“HDFS”
!hdfs.path – HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在目录下创建文件的名称前缀
hdfs.fileSuffix – 追加到文件的名称后缀 (eg .avro - 注: 日期时间不会自动添加)
hdfs.inUsePrefix – Flume正在处理的文件所加的前缀
hdfs.inUseSuffix .tmp Flume正在处理的文件所加的后缀
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#命名Agent组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述/配置Source
a1.sources.r1. type =http
a1.sources.r1.port=8888
#描述Sink
a1.sinks.k1. type =hdfs
a1.sinks.k1.hdfs.path=hdfs: //0 .0.0.0:9000 /ppp
#描述内存Channel
a1.channels.c1. type =memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
#为Channel绑定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
|
启动flume:
1
|
. /flume-ng agent --conf .. /conf --conf- file .. /conf/template9 .conf --name a1 -Dflume.root.logger=INFO,console
|
测试:通过利用curl给目的主机发送命令,会发现在HDFS中会生成相应的记录文件。
Flume 入门--几种不同的Sinks相关推荐
- Flume 入门教程(超详细)
文章目录 1. Flume 概述 1.1 Flume 定义 1.2 Flume 基础架构 1.2.1 Agent 1.2.2 Source 1.2.3 Sink 1.2.4 Channel 1.2.5 ...
- Flume中sources、channels、sinks的常用配置(多个案例)
Flume中sources.channels.sinks的常用配置(多个案例) 文章目录 Flume中sources.channels.sinks的常用配置(多个案例) Flume基础及架构 案例1: ...
- html圆如何找到垂直中心线,HTML+CSS入门 7种方式实现垂直居中
本篇教程介绍了HTML+CSS入门 7种方式实现垂直居中,希望阅读本篇文章以后大家有所收获,帮助大家HTML+CSS入门. < 1.使用绝对定位垂直居中 绝对对位原理:元素在过度受限情况下,将m ...
- 大数据学习——Flume入门
文章目录 一.Flume概述 1.1.Flume定义 1.2.Flume基础架构 二.Flume快速入门 2.1.安装Flume部署 2.2.入门案例 2.2.1.监控端口数据(官方案例) 2.2.2 ...
- 【Flume】Flume入门
Flume 简介 Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generatio ...
- Flume入门——Selector、Chanel等
1.selector (http://blog.csdn.net/looklook5/article/details/40430965) (http://blog.csdn.net/xiao_jun_ ...
- mysql凡人入门_3种PHP连接MYSQL数据库的常用方法
对于PHP入门用户来说,我们只要掌握基本的数据库写入.读取.编辑.删除等基本的操作就算入门,也可以写出简单的程序出来,比如留言本.新闻文章系统等等. 在整个过程中,MySQL数据库的连接也是比较重要的 ...
- Flume监控几种方式
Flume主要有以下集中监控方式: 1.JMX监控 配置 {$FLUME_HOME}/flume-env.sh cd $FLUME_HOME vi flume-env.sh JAVA_OPTS=&qu ...
- flume的几种使用方式
近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合.感觉一时难以上手,先从简单着手吧: 一.net ...
最新文章
- (转)探究 TCP 一次数据包最大负载,上限真的是 65495 byte 吗
- MySQL索引对NULL值的处理
- 33条C#、.Net经典面试题目及答案[zt]
- java velocity详解_[velocity] velocity详解
- 第一次CODING附parentElement.insertBefore使用详解
- 人生的意义,呵!我找到了
- 【BZOJ】2289: 【POJ Challenge】圆,圆,圆
- Python实战从入门到精通第十四讲——定义有默认参数的函数
- win10搭建无盘服务器配置,win10系统无盘安装教程
- 【微信页面】移动端微信页面禁止字体放大
- 全网详细接口测试ApiPost详细教程(实战),吐血整理
- 我们应如何度过自己的大学生活?
- 收银设备对接php,快速对接payjs的个人支付接口(收银台模式)
- App Inventor 2 连接调试器的各种方式比较
- Vivado_IDE(1)熟悉环境
- 计算机安装过程突然断电怎么办,安装SQL2008到一半,电脑突然断电怎么办
- 成熟男人与未成熟男人
- 中国魔笛痛改前非做好准备 国足不能失去传奇大师
- ios客户端学习-被苹果开发者中心拒绝附件上传不上
- 关于初学算法,习题2-2 韩信点兵(hanxin)的一些需要谨记的点