Failover Sink Processor

Failover Sink Processor维护了一个sink的优先级列表,具有故障转移的功能,具体的配置如下(加粗的必须配置):

属性名称 默认值 描述
sinks 多个sink用空格分开。
processor.type default 组件的名称,必须是:failover
processor.priority.<sinkName> 优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。
注:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。且failover时,同优先级的不会Failover,就算是同优先级的还存在也会报All sinks failed to process。
processor.maxpenalty 30000 失败的Sink最大的退避时间(单位:毫秒)(退避算法(退避算法为我们在解决重试某项任务的时候,提供了一个比较好的等待思想。),参考:http://qiuqiang1985.iteye.com/blog/1513049)

示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

Load balancing sink processor 提供了多个sinks负载均衡的能力。它维护了一个active sinks列表,该列表中的负载必须是分布式的。实现了round_robin(轮询调度) 或者 random(随机) 的选择机制,默认是:round_robin(轮询调度)。也可以通过继承AbstractSinkSelector类来实现自定义的选择机制。
当被调用时,选择器根据配置文件的选择机制挑选下一个sink,并且调用该sink。如果所选的Sink传递Event失败,则通过选择机制挑选下一个可用的Sink,以此类推。

属性名称 默认 描述
processor.sinks 多个sink用空格分开。
processor.type default 组件的名称,必须是:load_balance
processor.backoff false 是否以指数的形式退避失败的Sinks。
processor.selector round_robin 选择机制。必须是round_robinrandom 或者自定义的类,该类继承了AbstractSinkSelector
processor.selector.maxTimeOut 30000 默认是30000毫秒,屏蔽故障sink的时间

示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Failover和Load balancing实例

测试环境:

10.0.1.76(Client)

10.0.1.68 (Failover和Load balancing)

10.0.1.70

10.0.1.77

10.0.1.85

10.0.1.86

10.0.1.87

以10.0.1.76作为客户端,通过exec获取nginx的日志信息,然后将数据传到10.0.1.68(配置了Failover和Load balancing)的节点,最后10.0.1.68将数据发送的10.0.1.70,77,85,86,87节点,这些节点最终将数据写到本地硬盘。

10.0.1.76的配置:

[java] view plain copy
  1. a1.channels.c1.type = memory
  2. a1.channels.c1.capacity = 1000
  3. a1.channels.c1.transactionCapacity = 100
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.type = exec
  6. a1.sources.r1.command = tail -n 0 -F /home/nginx/logs/access.log
  7. a1.sinks.k1.type = avro
  8. a1.sinks.k1.channel = c1
  9. a1.sinks.k1.hostname = 10.0.1.68
  10. a1.sinks.k1.port = 41415
  11. a1.channels = c1
  12. a1.sources = r1
  13. a1.sinks = k1

获取nginx产生的日志,然后通过avro发送的10.0.1.68

10.0.1.68配置(配置A):

[java] view plain copy
  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k70 k77 k85 k86 k87
  4. a1.sinkgroups = g1 g2 g3
  5. a1.sinkgroups.g1.sinks = k70 k85
  6. a1.sinkgroups.g1.processor.type = load_balance
  7. a1.sinkgroups.g1.processor.selector = round_robin
  8. a1.sinkgroups.g1.processor.backoff = true
  9. a1.sinkgroups.g2.sinks = k70 k86
  10. a1.sinkgroups.g2.processor.type = failover
  11. a1.sinkgroups.g2.processor.priority.k70 = 20
  12. a1.sinkgroups.g2.processor.priority.k86 = 10
  13. a1.sinkgroups.g2.processor.maxpenalty = 10000
  14. a1.sinkgroups.g3.sinks = k85 k87 k77
  15. a1.sinkgroups.g3.processor.type = failover
  16. a1.sinkgroups.g3.processor.priority.k85 = 20
  17. a1.sinkgroups.g3.processor.priority.k87 = 10
  18. a1.sinkgroups.g3.processor.priority.k77 = 5
  19. a1.sinkgroups.g3.processor.maxpenalty = 10000
  20. a1.channels.c1.type = memory
  21. a1.channels.c1.capacity = 1000
  22. a1.channels.c1.transactionCapacity = 100
  23. a1.sources.r1.channels = c1
  24. a1.sources.r1.type = avro
  25. a1.sources.r1.bind = 0.0.0.0
  26. a1.sources.r1.port = 41415
  27. a1.sinks.k87.channel = c1
  28. a1.sinks.k87.type = avro
  29. a1.sinks.k87.hostname = 10.0.1.87
  30. a1.sinks.k87.port = 41414
  31. a1.sinks.k86.channel = c1
  32. a1.sinks.k86.type = AVRO
  33. a1.sinks.k86.hostname = 10.0.1.86
  34. a1.sinks.k86.port = 41414
  35. a1.sinks.k85.channel = c1
  36. a1.sinks.k85.type = AVRO
  37. a1.sinks.k85.hostname = 10.0.1.85
  38. a1.sinks.k85.port = 41414
  39. a1.sinks.k77.channel = c1
  40. a1.sinks.k77.type = AVRO
  41. a1.sinks.k77.hostname = 10.0.1.77
  42. a1.sinks.k77.port = 41414
  43. a1.sinks.k70.channel = c1
  44. a1.sinks.k70.type = AVRO
  45. a1.sinks.k70.hostname = 10.0.1.70
  46. a1.sinks.k70.port = 41414

10.0.1.70和10.0.1.85Load balancing,均衡的方式为轮询调用。10.0.1.70和10.0.1.86为Failover,10.0.1.70和10.0.1.87为Failover

10.0.1.70,77,85,86,87配置:

[java] view plain copy
  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels.c1.type = memory
  5. a1.channels.c1.capacity = 1000
  6. a1.channels.c1.transactionCapacity = 100
  7. a1.sources.r1.channels = c1
  8. a1.sources.r1.type = AVRO
  9. a1.sources.r1.bind = 0.0.0.0
  10. a1.sources.r1.port = 41414
  11. a1.sinks.k1.channel = c1
  12. a1.sinks.k1.type = file_roll
  13. a1.sinks.k1.sink.directory = /data/load/
  14. a1.sinks.k1.sink.rollInterval = 0

通过Avro获取到Event,存放到文件中。

每次往nginx发2w个请求,然后查看10.0.1.70,77,85,86,87四台服务器接受数据的情况。我们做几组测试:

注:表格中的 * 表示关闭关闭该服务器Flume进程。

测试一:

发送2w个请求到Nginx中,查看各个节点接受数据的行数:

服务器
10.0.1.70
10.0.1.77
10.0.1.85
10.0.1.86
10.0.1.87
总计
数据行数
3400
0
3459
6778
6363
20000

其实无论测试2w次请求,还是测试100w次请求,10.0.1.77都无法接受到数据。

测试二:

服务器
10.0.1.70
10.0.1.77
10.0.1.85
10.0.1.86
10.0.1.87(*)
总计
数据行数
6619
6300
6840
13878
6363
40000

问题1: 作为Failover的节点86,87为何可以接受数据,而77没有将接收数据呢?

作为failover,我们会认为只有一个节点生效,其他节点只有在优先级节点down掉才能替补上去,在Flume中关于failover的实现,首先我们要了解Flume加载配置文件是有顺序的。如果配置文件的顺序不同,会导致failover出乎我们的意料,现在我们把上面的(配置A)关于failover和load_balance修改成如下(部分代码):

[java] view plain copy
  1. ......
  2. a1.sinkgroups = g2 g1 g3
  3. a1.sinkgroups.g3.sinks = k70 k85
  4. a1.sinkgroups.g3.processor.type = load_balance
  5. a1.sinkgroups.g3.processor.selector = round_robin
  6. a1.sinkgroups.g3.processor.backoff = true
  7. a1.sinkgroups.g2.sinks = k70 k86
  8. a1.sinkgroups.g2.processor.type = failover
  9. a1.sinkgroups.g2.processor.priority.k70 = 20
  10. a1.sinkgroups.g2.processor.priority.k86 = 10
  11. a1.sinkgroups.g2.processor.maxpenalty = 10000
  12. a1.sinkgroups.g1.sinks = k85 k87 k77
  13. a1.sinkgroups.g1.processor.type = failover
  14. a1.sinkgroups.g1.processor.priority.k85 = 20
  15. a1.sinkgroups.g1.processor.priority.k87 = 10
  16. a1.sinkgroups.g1.processor.priority.k77 = 5
  17. a1.sinkgroups.g1.processor.maxpenalty = 10000
  18. ......

如果修改成如下的配置,启动时报如下错误:

[java] view plain copy
  1. WARN 2015-10-22 14:22:01 [org.apache.flume.conf.FlumeConfiguration] - Could not configure sink group g3 due to: No available sinks for sinkgroup: g3. Sinkgroup will be removed
  2. org.apache.flume.conf.ConfigurationException: No available sinks for sinkgroup: g3. Sinkgroup will be removed
  3. at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:754)
  4. at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:348)
  5. at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$0(FlumeConfiguration.java:313)
  6. at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
  7. at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
  8. 。。。。。。

报异常的原因,我们可以查看源码,找到答案,FlumeConfiguration类的isValid()方法:

[java] view plain copy
  1. sourceSet = validateSources(channelSet);
  2. sinkSet = validateSinks(channelSet);
  3. sinkgroupSet = validateGroups(sinkSet);

上述是主要的源码片段,可以Debug进去看看,大致的流程:以validateGroups为例,Flume根据sinkgroups顺序的解析配置文件,然后把sink放到变量名为usedSinks的Map当中, 每个sink只能使用一次 ,如果sink在前面某个sinkgroups已经使用,那么就会在该sinkgroups中删除这个sink。按上面的配置,Flume开始解析sinkgroups的g1,则g1包含k85,k87和k77三个有效sink;然后解析sinkgroups的g2,则g2包含k70和k86;解析sinkgroups的g3时,因为k70和k85已经在g1和g2存在了,所以g3包含的sink为空,才导致报如上的错误。也就是说Flume是根据usedSinks来实现failover和load_balance的,因为配置的原因,可能会跟你想象的效果相差甚远。

在AbstractConfigurationProvider类的getConfiguration方法,代码片段:

[java] view plain copy
  1. public MaterializedConfiguration getConfiguration() {
  2. MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
  3. FlumeConfiguration fconfig = getFlumeConfiguration();//加载和验证配置文件的入口
  4. AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
  5. if (agentConf != null) {
  6. Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
  7. Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
  8. Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
  9. try {
  10. loadChannels(agentConf, channelComponentMap);//初始化channel
  11. loadSources(agentConf, channelComponentMap, sourceRunnerMap);//初始化source
  12. loadSinks(agentConf, channelComponentMap, sinkRunnerMap);//初始化sink
  13. ......

验证完之后,加载Channels,Sources,Sinks,根据验证的结果g1,g2,g3的usedSinks分配如下(配置A):

g1 的usedSinks是:k70和k85

g2 的usedSinks是:k86

g3 的usedSinks是:k87,k77

以loadSinks为例,加载Sink,先调用AbstractConfigurationProvider类的loadSinks方法,然后调用loadSinkGroups方法来初始化Sink,g1的usedSinks有k70和k85,所以k70和k85这两个节点通过round_robin方式balance来接收数据;g2的usedSinks只有k86(由于k70已经在g1中被占用了),所以只有k86接收数据,自然也不会有failover的功能;g3的usedSinks有k87和k77,由于Failover会选取优先级最高的接收数据,所以k87接收数据,当k87挂掉的时候,k77替补上去接收数据。这也就是为何其他节点都可以接收数据,唯独只有k77没有数据的原因。

再者每个sinkgroups都会启动一个SinkRunner线程去调用FailoverSinkProcessor和LoadBalancingSinkProcessor的process()方法去获取数据,这也就是为啥Failover和balance都能接收数据的原因,具体的实现细节,可以自行阅读源码。

2,Failover的情况下,是否优先级越高的就先生效?

是的,同一个Failover下的sink都存放在TreeMap下,然后取最大优先级的Sink作为activeSink。

3,Failover的情况下,如果优先级相同是怎么做失败转移的?

优先级相同的sink节点在failover中只会有一个生效,看源码可以很容易的发现,因为Failover中live的Sink存放在TreeMap中,用优先级作为key,同等优先级的Sink只能保存一个。

[java] view plain copy
  1. @Override
  2. public void configure(Context context) {
  3. liveSinks = new TreeMap<Integer, Sink>();  //存活的Sink放在TreeMap中,且用priority作为Key
  4. failedSinks = new PriorityQueue<FailedSink>();
  5. Integer nextPrio = 0;
  6. String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
  7. if(maxPenaltyStr == null) {
  8. maxPenalty = DEFAULT_MAX_PENALTY;
  9. } else {
  10. try {
  11. maxPenalty = Integer.parseInt(maxPenaltyStr);
  12. } catch (NumberFormatException e) {
  13. logger.warn("{} is not a valid value for {}",
  14. new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
  15. maxPenalty  = DEFAULT_MAX_PENALTY;
  16. }
  17. }
  18. for (Entry<String, Sink> entry : sinks.entrySet()) {
  19. String priStr = PRIORITY_PREFIX + entry.getKey();
  20. Integer priority;
  21. try {
  22. priority =  Integer.parseInt(context.getString(priStr));
  23. } catch (Exception e) {
  24. priority = --nextPrio;
  25. }
  26. if(!liveSinks.containsKey(priority)) {
  27. liveSinks.put(priority, sinks.get(entry.getKey()));//priority作为Key
  28. } else {
  29. logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
  30. "duplicates that of sink {}", entry.getKey(),
  31. liveSinks.get(priority));
  32. }
  33. }
  34. activeSink = liveSinks.get(liveSinks.lastKey());//获取优先级最高的节点作为active节点

总结

1,load_balance配置中的Sink都可以接收数据。

2,load_balance根据均衡策略接收数据。

3,没有Sink既能failover又能load_balance。

4,failover中的Sink优先级不要设置为相同的值。

5,failover配置中的Sink只有优先级最高及没有被之前加载的sinkgroups占用的Sink接收数据,如果优先级高的Sink挂掉,则转到优先级次之的Sink。

6,failover可以做失败转移,如果因为加载顺序的问题,导致failover的Sink已经被占用,failover会造成配置在failover中的sink都能接收数据的假象,其实只是在剩余的sink中实施failover策略。

Flume之Failover和Load balancing原理及实例相关推荐

  1. 网络负载平衡(Network Load Balancing)的工作原理

    NLB算法的特点: 在NLB群集中,每台服务器都会有一个属于自己的静态IP地址,同时NLB群集中的所有服务器还有一个共同的IP地址-NLB群集地址: 当客户向NLB群集(NLB的虚拟IP地址)发起请求 ...

  2. Flume10:【案例】Sink Processors之 负载均衡:Load balancing Sink Processor

    一.Sink Processors 接下来看一下Sink处理器 Sink Processors类型包括这三种:Default Sink Processor.Load balancing Sink Pr ...

  3. 负载均衡(Load Balancing)学习笔记(二)

    概述 文章负载均衡(Load Balancing)学习笔记(一) 讲述了负载均衡的一般性原理,本文继续介绍常见的实现负载均衡的方法. HTTP重定向 HTTP重定向服务器是一台普通的Web服务器,用户 ...

  4. ASA/PIX: Load balancing between two ISP - options

    ASA/PIX: Load balancing between two ISP - options VERSION 7  Is it possible to load balance between ...

  5. 科普|什么是负载均衡(Load balancing)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者 | Enter 来源 | cnblogs.com/kingr ...

  6. 网页加载出现没有合适的负载均衡器_终于讲清楚了,什么是负载均衡(Load balancing)...

    什么是负载均衡(Load balancing) 在网站创立初期,我们一般都使用单台机器对台提供集中式服务,但随着业务量越来越大,无论性能还是稳定性上都有了更大的挑战.这时候我们就会想到通过扩容的方式来 ...

  7. nginx tcp代理_nginx——TCP/UDP Load Balancing

    nginx––– tcp and udp 代理 Introduction nginx 反向代理一般都是7层代理,进行http/https 协议层的转发:说起4层代理,一般想到的都是lvs 和 hapr ...

  8. Nginx(四):负载均衡Load balancing

    1.概念基础 Load balancing,即负载均衡,是一种计算机技术,用来在多个计算机(计算机集群).网络连接.CPU.磁盘驱动器或其他资源中分配负载,以达到最优化资源使用.最大化吞吐率.最小化响 ...

  9. UVA 12904 Load Balancing 暴力

    Load Balancing Time Limit: 20 Sec Memory Limit: 256 MB 题目连接 http://acm.hust.edu.cn/vjudge/contest/vi ...

最新文章

  1. Error: The 'decorators' plugin requires a 'decoratorsBeforeExport' option
  2. python matplotlib plt 画图 将刻度 替换为文字/字符以及画断断续续的分段函数
  3. 试用到期_各大化妆品品牌试用装广告
  4. WUSTOJ 1285: Factors(Java)
  5. linux - mysql 异常:Ignoring query to other database
  6. 基于小程序的定位健康打卡系统
  7. 关于原生HTML+CSS div的高度随着宽度按比例缩放
  8. win7忘记密码解决,Administrator账号密码忘记 解决办法
  9. 数字信号和模拟信号的区别
  10. Python实现逻辑门
  11. 2022年4月蓝桥杯软件类省赛:真题+解析
  12. three.js例子
  13. 长链剖分 - 攻略(BZOJ3252)
  14. 二级计算机vb答案,计算机二级VB考试练习题及答案
  15. PTA 电话聊天狂人 思路分析及代码解析
  16. JavaScript实现警告框
  17. 网上书城项目的书籍分类列表展示及新书上架和热销书籍效果展示功能(项目进度四)
  18. 最小堆的魅力!思路清晰求解「至少需要多少间会议室」
  19. 内网渗透、三层拓扑、红队考核靶场(ack123)
  20. 逃离迷宫 c++ bfs(中南大学考研机试题

热门文章

  1. C语言 #include指令
  2. 北理和国防科技计算机,“国防七子”实力真的非常强大吗?理科考生在他们之间该怎么做选择?...
  3. POI设置Excel单元格样式和背景色
  4. 七个习惯之二:以终为始
  5. 打架必备!擒敌拳1-16动连贯动作 分解动作
  6. VS2019添加git源代码管理-增加VS版本 16.10.4的GIT管理
  7. 一些你我所不知道的小知识
  8. 同等学力真题押题均是骗子!勿上当
  9. Http_4个新的http状态码:428、429、431、511
  10. 已知四种原子的质量,C/H/O/N分别为12/1/16/14,输入分子式,计算分子量。例如H2O,分子量为1*2+16=18,有如HC11N2,分子量为1+12*11+14*2=161