batchsize和transactionCapacity的区别

batchSize(Source和Sink中一体事务提交最大的envn(消息)条数: 是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。
即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。当event数量超过batchSize事务就会提交
这个参数值越大,每个事务提交的范围就越大,taskList的清空等操作次数会减少,因此性能肯定会提升,但是可能在出错时,回滚的返回也会变大。

**transactionCapacity (channel中规定的一次事务中包含envn(消息)条数:**参数官方解释:channel在一次事务中传递事件的最大数量。 其实就是putList和takeList的容量大小。在flume1.5版本中SpillableMemoryChannel的putList和takeList的长度为largestTakeTxSize和largestPutTxSize参数,该参数值为5000

**capacity(channel可以存储envn(消息)条数)**参数官方解释:The maximum number of events stored in the channel,channel存储事件的最大数量

注意,这里有一个隐晦的地方,就是batchSize一定不能大于transactionCapacity

transactionCapacity这个概念来自于通道中,不同于batchSize(Source,Sink),说白了,就是batchSize与transactionCapactiy相互传递数据,好比两个人相互递交文件,所以batchSize的容量要小于在channel内的transactionCapactiy,否则文件会溢出。

event

一个envn包含一条数据可以把一个event当做一个http

header是一个map,body是一个字节数组,body才是我们实际使用中真正传输的数据,header传输的数据,我们是不会是sink出去的。

byte[] bytes = event.getBody(); 获取enent存储的实际数据。
header是一个map,body是一个字节数组,body是真正传输的数据,header传输的数据不会被sink出去。
在source端产出event的时候,通过header去区别对待不同的event,然后在sink端的时候,我们就可以通过header中的key来将不同的event输出到对应的sink下游去,这样就将event分流出去了,

可以根据event的header中的key将其放入不同的channel中,紧接着,再通过配置多个sink去不同的channel取出event,将其分流到不同的输出端

每个sink配置的通道区别开就行了

1.Flume 概述

1.1Flume 定义和作用

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传
输的系统。Flume 基于流式架构,灵活简单。
flume只能传输文本文件,不能传输图片
Flume最大作用就是提高数据流的实时性

Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS

1.2Flume的基础架构

Flume 组成架构如图


1.2.1 Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有 3 个部分组成,Source、Channel、Sink。
1.2.2 Source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种
格式的日志数据,包括 avro(阿波罗)、thrift、exec、jms、spooling directory、netcat、sequence
generator、syslog、http、legacy。
用的比较多的source

1.2.3 Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储
或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfsloggeravro、thrift、ipc、fileHBase、solr、自定
义。
1.2.4 Channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运
作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个
Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel 以及 Kafka Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适
用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕
机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数
据。
1.2.5 Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,
Body 用来存放该条数据,形式为字节数组。

  1. event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。
  2. Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
  3. Body是一个字节数组,包含了实际的内容及实际的。

2.Flume 快速入门

2.1 Flume 安装部署
2.1.1 安装地址
1) Flume 官网地址
http://flume.apache.org/
2)文档查看地址
http://flume.apache.org/FlumeUserGuide.html
3)下载地址
http://archive.apache.org/dist/flume/
2.1.2 安装部署
1)将 apache-flume-1.7.0-bin.tar.gz 上传到 linux 的/opt/software 目录下
2)解压 apache-flume-1.7.0-bin.tar.gz 到/opt/module/目录下

[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-
bin.tar.gz -C /opt/module/

3)修改 apache-flume-1.7.0-bin 的名称为 flume
[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
4)将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flumeenv.sh 文件

[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

2.2 Flume 入门案例
2.2.1 监控端口数据官方案例
1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
2)需求分析:

3)实现步骤:
1.安装 netcat 工具

[atguigu@hadoop102 software]$ sudo yum install -y nc

2.判断 44444 端口是否被占用

[atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444

3.创建 Flume Agent 配置文件 flume-netcat-logger.conf 在 flume 目录下创建 job 文件夹并进入 job 文件夹。

[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/

在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。

[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf

在 flume-netcat-logger.conf 文件中添加如下内容。
注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html

添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

一个channel可以绑定多个sinks,一个sinks只可以绑定一个channel
一个channel可以绑定多个sources,一个sources只可以绑定多个channel
4. 先开启 flume 监听端口
第一种写法:

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a1 --conf-file job/flume-netcat-logger.conf -
Dflume.root.logger=INFO,console

第二种写法(推荐使用):

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:
–conf/-c:表示配置文件存储在 conf/目录
–name/-n:表示给 agent 起名为 a1
–conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
文件。-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger
参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、
error。 5.使用 netcat 工具向本机的 44444 端口发送内容

[atguigu@hadoop102 ~]$ nc localhost 44444
hello
atguigu

6.在 Flume 监听页面观察接收数据情况

3.Flume 事务

4.Flume Agent 内部原理


重要组件:
1)ChannelSelector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。 2)2)SinkProcessor
SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。

5.Flume 拓扑结构

5.1简单串联


这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量,flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
复制和多路复用

5.2复制和多路复用


Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。

5.3负载均衡和故障转移

解决单台机器压力过大。
agent1轮询分发到agent2和agent3、agent4。

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

5.4聚合(推荐使用)


这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

5.5生产话环境中:将聚合+负载均衡

6.自定义 Interceptor(拦截器)

实现步骤
1.创建一个 maven 项目,并引入以下依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version>
</dependency>

2.定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();if (body[0] < 'z' && body[0] > 'a') {event.getHeaders().put("type", "letter");} else if (body[0] > '0' && body[0] < '9') {event.getHeaders().put("type", "number");}return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomInterceptor();}@Overridepublic void configure(Context context) {}} }

3.编辑 flume 配置文件
为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume2 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。


a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4.分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。
5.在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。
6.观察 hadoop103 和 hadoop104 打印的日志。

7.自定义 Source

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能
满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化 context(读取配置文件内容)
process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version>
</dependency>
</dependencies>

8.自定义 Sink

9.Flume 数据流监控

3.8.1 Ganglia 的安装与部署

10.Flume 对接 Kafka

有了flume可以直接对接hdfs,spark 为什么还要flume->kafka->spark:
flume sink不能动态的增加提供给多个业务线使用, kafka消费可以动态的增加消费者,且副本不需要增加。

1)配置 flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动 kafkaIDEA 消费者
3) 进入 flume 根目录下,启动 flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况

$ echo hello >> /opt/module/data/flume.log

11.企业真实面试题(重点)


Flume 尚硅谷2019相关推荐

  1. HTMLCSS学习笔记 纯知识点版 (尚硅谷2019李立超版)

    此版为纯知识点无练习笔记版本 目录 1. HTML基础 1.1 网页的结构 1.2 HTML中的"实体" 1.3 meta 标签 1.4 语义化标签 1.7 列表 1.8 超链接 ...

  2. 尚硅谷-互联网大厂高频重点面试题 (第2季)JUC多线程及高并发

    本期内容包括 JUC多线程并发.JVM和GC等目前大厂笔试中会考.面试中会问.工作中会用的高频难点知识. 斩offer.拿高薪.跳槽神器,对标阿里P6的<尚硅谷_互联网大厂高频重点面试题(第2季 ...

  3. 2019尚硅谷大数据Maven篇一 Maven安装和概念

    2019尚硅谷大数据Maven篇一安装和概念 tags: golang 2019尚硅谷 categories: java Maven 安装 依赖 概念 文章目录 2019尚硅谷大数据Maven篇一安装 ...

  4. 2019尚硅谷大数据Javaweb篇三 Ajax、JSTL、会话技术、过滤器、监听器、xml、json

    2019尚硅谷大数据 Javaweb篇三Ajax.JSTL.会话技术.过滤器.监听器 tags: 大数据 2019尚学堂 categories: Ajax异步请求 JSTL中的if和forEach 会 ...

  5. 尚硅谷全套课件整理:Java、前端、大数据、安卓、面试题

    目录 Java 尚硅谷 IT 精英计划 JavaSE 内部学习笔记.pdf 尚硅谷 Java 基础实战之银行项目.pdf 尚硅谷 Java 技术之 JDBC.pdf 尚硅谷 Java 技术之 Java ...

  6. 尚硅谷谷粒学院学习笔记9--前台用户登录,注册,整合jwt,微信登录

    用户登录业务 单点登录(Single Sign On),简称SSO. 用户只需要登陆一次就可以访问所有相互信任的应用系统 单点登录三种常见方式 session广播机制实现 使用redis+cookie ...

  7. 【JVM】最全笔记(黑马+尚硅谷+张龙整合笔记)

    本身整合了如下视频的笔记,并进行了整理:尚硅谷周阳.张龙.黑马程序员 黑马ppt非常好:https://download.csdn.net/download/hancoder/12834607 本文及 ...

  8. 尚硅谷YYDS (课件资料)

    面试大保健 链接:https://pan.baidu.com/s/1duUHb4AwOnW9jycDnUCRVA 提取码:undf 尚硅谷大数据技术之 StarRocks 链接:https://pan ...

  9. b站尚硅谷springmvc学习视频:springmvc文档

    文章目录 一.SpringMVC简介 (b站尚硅谷springmvc学习视频:springmvc文档) 1.什么是MVC 2.什么是SpringMVC 3.SpringMVC的特点 二.HelloWo ...

最新文章

  1. 为ThinkPad T420增加一根4G内存
  2. flex中toolTip汇总
  3. oracle 表分区[三]
  4. 13.4 常见的时期类和Math类
  5. 遇见C++ AMP:在GPU上做并行计算
  6. 正则化与L0、L1、L2范数祥解
  7. 惊呆了!这篇论文全文都是脏话,可编辑部居然对它评价极佳并发表了!
  8. 总结下用Vue.js和webpack遇到的问题
  9. 凸优化第九章无约束优化 9.2下降方法
  10. VC++ 6.0 快捷键
  11. android大作业计算器,2015大作业简易计算器实验报告.doc
  12. HIVE的基本使用05(HSQL调优)
  13. 5.1(电脑技能经验) 美图秀秀批量处理图片大小,针对CSDN博客上传图片要求,小于2M,图片大小设计712*400
  14. 使用medusa进行ssh爆破
  15. 巧用Q盘搭建SVN服务器
  16. 一年有四季的c语言编程,一年是否有四季?
  17. 计算机信息心得体会作文50字,考试感想作文50字5篇
  18. IOS开发插件和工具总结
  19. Spring Data JDBC自动生成的增删改查CRUD分页、排序SQL语句非常简洁没有多余的SQL
  20. 通达信VOL实战监测,很实用 可以替代成交量指标公式 源码 效果图

热门文章

  1. Zookeeper之开源客户端ZkClient
  2. OpenCV+ Qt Designer 开发人脸识别考勤系统
  3. 从环境部署到运营推广,蚂蚁特色的一站式小程序开发体验
  4. Python应用与实践-转自(吴秦(Tyler))
  5. 评价类模型(层次分析法与模糊评价模型)
  6. ganache-cli命令行参数
  7. 前端开发性能优化-Vue响应式优化
  8. 北京大学软件工程课程听课笔记---软件工程课程介绍第二讲
  9. 运动员svg图标素材推荐 精品 小众
  10. #####ELK#####