Flume+Kafka+SparkStreaming整合
目录
1.Flume介绍.2
1.1 Flume数据源以及输出方式.2
1.2 Flume的核心概念.2
1.3 Flume结构.2
1.4 Flume安装测试.3
1.5 启动flume4
2.Kafka介绍.4
2.1 Kafka产生背景.4
2.2 Kafka部署结构.4
2.3 Kafka集群架构.4
2.4 Kafka基本概念.5
2.5 Kafka安装测试.5
3.Flume和Kafka整合.6
3.1两者整合优势.6
3.2 Flume和Kafka整合安装.6
3.3 启动kafka flume相关服务.7
3.4 Kafka和SparkStreaming整合.8
1. Flume介绍
Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
1.1 Flume数据源以及输出方式
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。
Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。本测试研究中由kafka来接收数据。
1.2 Flume的核心概念
1. Agent:使用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
2. Client:生产数据,运行在一个独立的线程。
3. Source:从Client收集数据,传递给Channel。
4. Sink:从Channel收集数据,运行在一个独立线程。
5. Channel:连接 sources和 sinks ,这个有点像一个队列。
6. Events :可以是日志记录、 avro对象等。
1.3 结构
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:
Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、ContextualRouting、Backup Routes。如下图所示:
1.4 安装测试
解压apache-flume-1.6.0-bin.tar.gz:tar –zxvf apache-flume-1.6.0-bin.tar.gz
cp conf/flume-conf.properties.template conf/exec.conf
cp conf/flume-env.sh.template conf/flume-env.sh 配置JAVA_HOME
exec.conf配置如下:
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.channels = c2
a2.sources.r2.command=tail -n +0 -F /usr/local/hadoop/flume/test.log
# Describe the sink
a2.sinks.k2.type = logger
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
验证安装:flume-ng version
1.5 启动flume
flume-ng agent --conf ./flume/conf/ -f ./flume/conf/exec.conf-Dflume.root.logger=DEBUG,console -n a2
发送数据和flume接收数据:
2.Kafka介绍
2.1 产生背景
Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。Kafka是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka 就出现了。Kafka 可以起到两个作用:
降低系统组网复杂度
降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速数据总线的作用。
2.2 部署结构
2.3 集群架构
2.4 基本概念
Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
Partition:Topic 物理上的分组,一个topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。
Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
Producers:消息和数据生产者,向 Kafka的一个 topic 发布消息的过程叫做 producers。
Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为broker。
2.5 安装测试
解压Kafka: tar -xzf kafka_2.10-0.8.1.1.tgz
启动ZK bin/zookeeper-server-start.shconfig/zookeeper.properties
启动服务bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1&
创建主题 bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主题 bin/kafka-topics.sh --list --zookeeperlocalhost:2181
查看主题详情 bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test
删除主题 bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181
创建生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning
3.Flume和Kafka整合
3.1 两者整合优势
Flume更倾向于数据传输本身,Kakfa是典型的消息中间件用于解耦生产者消费者。
具体架构上,Agent并没把数据直接发送到Kafka,在Kafka前面有层由Flume构成的forward。这样做有两个原因:
Kafka的API对非JVM系的语言支持很不友好,forward对外提供更加通用的HTTP接口。
forward层可以做路由、Kafka topic和Kafkapartition key等逻辑,进一步减少Agent端的逻辑。
数据有数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算。本文实时计算采用SparkStreaming做测试。
3.2 Flume和Kafka整合安装
Flume和Kafka插件包下载:https://github.com/beyondj2ee/flumeng-kafka-plugin
提取插件中的flume-conf.properties文件:修改如下:flume源采用exec
producer.sources.s.type = exec
producer.sources.s.command=tail -f -n+1/usr/local/Hadoop/flume/test.log
producer.sources.s.channels = c
修改producer代理的topic为test
将配置放到flume/cong/producer.conf中
复制插件包中的jar包到flume/lib中:删除掉版本不同的相同jar包,这里需要删除scala-compiler-z.9.2.jar包,否则flume启动会出现问题。
复制kafka/libs中的jar包到flume/lib中。
完整producer.conf:
producer.conf:
#agentsection
producer.sources= s
producer.channels= c
producer.sinks= r
#sourcesection
producer.sources.s.type= exec
#producer.sources.s.spoolDir= /usr/local/hadoop/flume/logs
#producer.sources.s.fileHeader= true
producer.sources.s.command= tail -f -n+1 /usr/local/hadoop/flume/aaa.log
producer.sources.s.channels= c
# Eachsink's type must be defined
producer.sinks.r.type= org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=localhost:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specifythe channel the sink should use
producer.sinks.r.channel= c
# Eachchannel's type is defined.
producer.channels.c.type= memory
producer.channels.c.capacity= 1000
producer.channels.c.transactionCapacity= 100
3.3 启动kafka flume相关服务
启动ZKbin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka服务 bin/kafka-server-start.sh config/server.properties
创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning
启动flume
flume-ng agent --conf./flume/conf/ -f ./flume/conf/producer.conf -Dflume.root.logger=DEBUG,console-n producer
向flume发送数据:
Kafka消费者数据:
3.4 Kafka和SparkStreaming整合
核心代码:
完整代码路径:
spark-1.4.0\examples\src\main\java\org\apache\spark\examples\streaming
执行参数:
发送数据:
由于flume采用exec数据源的方式,因此flume会监听配置的相应的文件: tail -f -n+1 /usr/local/Hadoop/flume/aaa.log
当向该文件追加文件时,flume就会获取追加的数据:
writetoflume.py
flume将获取的增量数据由sink发送给kafka,以下是kafka comsumer消费的数据
执行结果:
SparkStreaming订阅kafka的test主题的数据,将订阅的数据进行单词计数处理。
Flume+Kafka+SparkStreaming整合相关推荐
- flume+kafka+storm整合02---问题
###1.启动storm时总是报错 storm Session 0x0 for server null, unexpected error, closing socket connection ### ...
- Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示
http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...
- 大数据集群搭建(12)——Flume和Kafka的整合
Flume和Kafka的整合 1.配置flume,在flume的conf目录下新建文件(flume_kafka.conf)并配置. ################################# ...
- Flume+Kafka+Strom基于伪分布式环境的结合使用
--------------------------------------- 博文作者:迦壹 博客地址:Flume+Kafka+Strom基于伪分布式环境的结合使用 转载声明:可以转载, 但必须以超 ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- 玩转Kafka—SpringGo整合Kafka
玩转Kafka-Spring整合Kafka 1 新建Spring Boot项目,增加依赖 <dependencies><dependency><groupId>or ...
- Flume+Kafka双剑合璧玩转大数据平台日志采集
点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...
- Flume+kafka+flink+es 构建大数据实时处理
大数据目前的处理方法有两种:一种是离线处理,一种是实时处理.如何构建我们自己的实时数据处理系统我们选用flume+kafka+flink+es来作为我们实时数据处理工具.因此我们的架构是: flume ...
- flume+kafka消费数据【纯个人笔记】
1.数据生产 使用java代码往一个文件中写入数据 package com.mobile;import java.io.*; import java.text.DecimalFormat; impor ...
最新文章
- Flutter Exception降到万分之几的秘密
- 运行 OpenCV-Python-Toturial-中文版 遇到的一些错误问题
- 对于写bash脚本的朋友,read命令是不可或缺的,需要实践一下就可以了解read命令的大致用途: 编写一个脚本: #!/bin/bash # hao32 test read echo -e Pl
- (GitHub标星6.9k)超详细的人工智能专家路线图,
- 如何选择开源许可证?(转)
- struts2学习笔记之十一:struts2的类型转换器
- ZH奶酪:Ionic中(弹出式窗口)的$ionicModal使用方法
- c++ linux 环境,C++ 环境设置
- 大家都在讲大数据,大数据是什么呢?
- 电脑全能工具箱,400+工具免费用
- 大数据基础概念思维导图
- xml.html一键解密工具,AutoJs一键解密
- Chrome快捷键整理
- terminate called after throwing an instance of ‘YAML::TypedBadConversion<int>‘ what(): bad conver
- React 合成事件
- 涉密计算机能用固态硬盘,电脑硬盘有坏道还能继续使用吗?不要抱侥幸心理,可能后悔莫及...
- 计算机术语我喜欢你,摩斯密码表白高级表白密码 摩斯密码我喜欢你数字
- 当我们谈论Unidbg时我们在谈什么
- 自连接、外连接和自连接查询
- Apple Watch耗电快?5个技巧让你的苹果手表超省电