一、Sink Processors

接下来看一下Sink处理器
Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor

DefaultSink Processor是默认的,不用配置sinkgroup,就是咱们现在使用的这种最普通的形式,一个channel后面接一个sink的形式。

Load balancing Sink Processor是负载均衡处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,根据指定的算法进行轮询或者随机发送,减轻单个sink的压力。

Failover Sink Processor是故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。

下面我们来看两个案例:
负载均衡、故障转移

先看负载均衡

二、Load balancing Sink Processor【负载均衡】


看这个图,在这里面一个channel后面接了两个sink,这两个sink属于一个sink group,具体的数据发送策略我们来看一下文档

看中间的参数信息,
processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可【注意,这里写的是processor.sinks,但是在下面的example中使用的是sinks,实际上就是sinks,所以文档也是有一些瑕疵的,不过Flume的文档已经算是写的非常好的了】

processor.type:针对负载均衡的sink处理器,这里需要指定load_balance

processor.selector:此参数的值内置支持两个,round_robin和random,

round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。

processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长;一直到达到最大的时间。

如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率。
processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒

最后看一下里面提供的例子:

# 指定sinkgroup的名称
a1.sinkgroups = g1
# 指定sinkgroup中有哪些sink
a1.sinkgroups.g1.sinks = k1 k2
# 使用负载均衡策略
a1.sinkgroups.g1.processor.type = load_balance
# 开启后,故障的节点会列入黑名单
a1.sinkgroups.g1.processor.backoff = true
# 指定sinkgroup的数据发送策略
a1.sinkgroups.g1.processor.selector = random

这个负载均衡案例可以解决之前单节点输出能力有限的问题,可以通过多个sink后面连接多个Agent实现负载均衡,如果后面的Agent挂掉1个,也不会影响整体流程,只是处理效率又恢复到了之前的状态。
下面来配置Agent,先配置bigdata04节点上的这个Agent

[root@bigdata04 conf]# vi load-balancing.conf
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置sink组件,[为了方便演示效果,把batch-size设置为1]
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.182.101
a1.sinks.k1.port=41414
a1.sinks.k1.batch-size = 1a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.182.102
a1.sinks.k2.port=41414
a1.sinks.k2.batch-size = 1# 配置sink策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

接着配置bigdata02的Agent

[root@bigdata02 conf]# vi load-balancing-101.conf
# agent的名称是a1# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = data101
a1.sinks.k1.hdfs.fileSuffix = .log# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

接着配置bigdata03上的Agent

[root@bigdata03 conf]# vi load-balancing-102.conf
# agent的名称是a1# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值]
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意:bigdata04中指定的a1.sinks.k1.port=41414和a1.sinks.k2.port=41414需要和bigdata02和bigdata03中的a1.sources.r1.port = 41414的值保持一致

接下来开始启动这几个Agent
需要先启动bigdata02和bigdata03上的Agent,最后启动bigdata04上的Agent

启动bigdata02上的Agent

注意:在启动之前需要到/etc/profile中先配置HADOOP_HOME环境变量,因为这个Agent中使用到了hdfs

[root@bigdata02 apache-flume-1.9.0-bin]# vi /etc/profile
........
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
[root@bigdata02 apache-flume-1.9.0-bin]# source /etc/profile
[root@bigdata02 apache-flume-1.9.0-bin]#  bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console

启动bigdata03上的Agent

注意:在启动之前需要到/etc/profile中先配置HADOOP_HOME环境变量,因为这个Agent中使用到了hdfs

[root@bigdata03 apache-flume-1.9.0-bin]# vi /etc/profile
........
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
[root@bigdata03 apache-flume-1.9.0-bin]# source /etc/profile
[root@bigdata03 apache-flume-1.9.0-bin]#  bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console

启动bigdata04上的Agent

[root@bigdata04 apache-flume-1.9.0-bin]#  bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

通过telnet产生数据

[root@bigdata04 ~]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hehe
OK
haha
OK

分别到bigdata02和bigdata03中hdfssink指定的path中验证数据

[root@bigdata01 soft]# hdfs dfs -ls hdfs://192.168.182.100:9000/load_balance
Found 2 items
-rw-r--r--   2 root supergroup          6 2020-05-03 12:44 hdfs://192.168.182.100:9000/load_balance/data101.1588481094383.log.tmp
-rw-r--r--   2 root supergroup          6 2020-05-03 12:44 hdfs://192.168.182.100:9000/load_balance/data102.1588481087463.log.tmp
[root@bigdata01 soft]# hdfs dfs -cat hdfs://192.168.182.100:9000/load_balance/data101.1588481094383.log.tmp
haha
[root@bigdata01 soft]# hdfs dfs -cat hdfs://192.168.182.100:9000/load_balance/data102.1588481087463.log.tmp
hehe

这就是sink负载均衡的使用,在这里面我们使用了轮询的策略,大家下去以后可以使用random试验一下,只需要修改bigdata04配置文件中a1.sinkgroups.g1.processor.selector的值即可。

Flume10:【案例】Sink Processors之 负载均衡:Load balancing Sink Processor相关推荐

  1. Nginx(四):负载均衡Load balancing

    1.概念基础 Load balancing,即负载均衡,是一种计算机技术,用来在多个计算机(计算机集群).网络连接.CPU.磁盘驱动器或其他资源中分配负载,以达到最优化资源使用.最大化吞吐率.最小化响 ...

  2. 【高可用HA】Apache (4) —— Mac下配置Apache Httpd负载均衡(Load Balancer)之mod_jk

    Mac下配置Apache Httpd负载均衡(Load Balancer)之mod_jk httpd版本: httpd-2.4.17 jk版本: tomcat-connectors-1.2.41 参考 ...

  3. 负载均衡Load Balance学习

    目录 1. 负载均衡简介 2. 负载均衡算法 3. Nginx负载均衡调度算法源码调研 1. 负载均衡简介 0x1: 负载均衡是什么 负载均衡是一种技术架构方法,它并不是具体指哪一种技术,也正是因为这 ...

  4. 负载均衡设备oracle,Oracle RAC 服务器端联接负载均衡(Load Balance)

    Oracle RAC 服务器端连接负载均衡(Load Balance) Oracle RAC服务器端的负载均衡是根据RAC中各节点的连接负荷数情况,将新的连接请求分配到负荷最小的节点上去.当数据库处于 ...

  5. Oracle RAC 客户端连接负载均衡(Load Balance)

    实现负载均衡(Load Balance)是Oracle RAC最重要的特性之一,主要是把负载平均分配到集群中的各个节点,以提高系统的整体吞吐能力.通常情况下有两种方式来实现负载均衡,一个是基于客户端连 ...

  6. 负载均衡Load Balance(F5 \ nginx \ LVS \ DNS轮询)

    文章目录 一.IP负载均衡 (一)硬件方式 (二)软件方式 二.DNS轮询 三.CDN (一)定义 (二)CDN网络结构 (三)CDN工作原理 在做系统架构规划的时候,负载均衡,HA(高可用性集群,是 ...

  7. 网页加载出现没有合适的负载均衡器_终于讲清楚了,什么是负载均衡(Load balancing)...

    什么是负载均衡(Load balancing) 在网站创立初期,我们一般都使用单台机器对台提供集中式服务,但随着业务量越来越大,无论性能还是稳定性上都有了更大的挑战.这时候我们就会想到通过扩容的方式来 ...

  8. 负载均衡解析与Nginx实战

    1. 负载均衡的概念 1.1 什么是负载均衡 Load Balancing,即负载均衡,是一种计算机技术,用来在多个计算机(计算机集群).网络连接.CPU.磁盘驱动器或其他资源中分配负载,以达到最优化 ...

  9. 运维企业专题(3)LB负载均衡介绍

    1.LB是什么?有哪些主要的方式? LB--负载均衡(Load Balancing ) 负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽.增加吞吐量.加强网络数 ...

最新文章

  1. Simics系统模拟器
  2. python迷宫算法及实现_Python迷宫递归算法
  3. oracle schema_了解Oracle备份恢复的知识
  4. 在windows下rust编译出现gcc.exe的错误
  5. 小白使用ubuntu杂记
  6. Java 生成随机数并进行查找
  7. 腾龙视觉设计学院孙姣老师讲pscs6艺术照片课录屏
  8. 解决虚拟机-虚拟网络配置没有桥接模式,本地没有虚拟网卡
  9. Alexnet网络结构详解
  10. html弹出div弹窗
  11. D3DAPI大全,全部函数
  12. android手机刷win10吗,有哪些手机可以刷win10系统?Android机上将可刷Win10?
  13. SVN上传不了.so .a库可尝试的解决方法
  14. ElasticSearch从入门到精通--第七话(自动补全、拼音分词器、自定义分词、数据同步方案)
  15. 性能测试--稳定性测试
  16. python基础技巧(三)——tf
  17. 大数据数据仓库 Hive (三)
  18. MyBatis分表分库插件的实现(推荐)
  19. 具有自适应调整策略的混沌灰狼优化算法-附代码
  20. MATLAB中DEMUX分配输出的方法

热门文章

  1. Spring(完整版)
  2. 【日志日记】一、网络句子摘抄
  3. 【Error】监听dblclick双击事件不起作用
  4. 逆战:微信小程序开发(二)
  5. 欢乐Flash夏令营
  6. php购物车js价钱总和,JavaScript 购物车自动计算价格
  7. 洋哥以亲身经历告诉你:要不要放弃高薪去做CTO?
  8. android 设置界面10秒无操作退休,关于Android系统无用户操作相关处理
  9. sqlserver 截取日期年份和月份使用datepart函数
  10. 2019.4.10-华为-勇敢星实习-笔试题目-第一题字符串处理