目录

1.Flume介绍.2

1.1 Flume数据源以及输出方式.2

1.2 Flume的核心概念.2

1.3 Flume结构.2

1.4 Flume安装测试.3

1.5 启动flume4

2.Kafka介绍.4

2.1 Kafka产生背景.4

2.2 Kafka部署结构.4

2.3 Kafka集群架构.4

2.4 Kafka基本概念.5

2.5 Kafka安装测试.5

3.Flume和Kafka整合.6

3.1两者整合优势.6

3.2 Flume和Kafka整合安装.6

3.3 启动kafka flume相关服务.7

3.4 Kafka和SparkStreaming整合.8

1. Flume介绍

Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

1.1 Flume数据源以及输出方式

Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。

Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。本测试研究中由kafka来接收数据。

1.2 Flume的核心概念

1.  Agent:使用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

2.  Client:生产数据,运行在一个独立的线程。

3.  Source:从Client收集数据,传递给Channel。

4.  Sink:从Channel收集数据,运行在一个独立线程。

5.  Channel:连接 sources和 sinks ,这个有点像一个队列。

6.  Events :可以是日志记录、 avro对象等。

1.3 结构

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:

Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、ContextualRouting、Backup Routes。如下图所示:

1.4 安装测试

解压apache-flume-1.6.0-bin.tar.gz:tar –zxvf apache-flume-1.6.0-bin.tar.gz

cp conf/flume-conf.properties.template conf/exec.conf

cp conf/flume-env.sh.template conf/flume-env.sh    配置JAVA_HOME

exec.conf配置如下:

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.channels = c2

a2.sources.r2.command=tail -n +0 -F /usr/local/hadoop/flume/test.log

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

验证安装:flume-ng version

1.5 启动flume

flume-ng agent --conf ./flume/conf/ -f ./flume/conf/exec.conf-Dflume.root.logger=DEBUG,console -n a2

发送数据和flume接收数据:


2.Kafka介绍

2.1 产生背景

Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。Kafka是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka 就出现了。Kafka 可以起到两个作用:

降低系统组网复杂度

降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速数据总线的作用。

2.2 部署结构

 

2.3 集群架构



2.4 基本概念

Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。

Partition:Topic 物理上的分组,一个topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。

Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。

Producers:消息和数据生产者,向 Kafka的一个 topic 发布消息的过程叫做 producers。

Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。

Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为broker。

2.5 安装测试

解压Kafka: tar -xzf kafka_2.10-0.8.1.1.tgz

启动ZK bin/zookeeper-server-start.shconfig/zookeeper.properties

启动服务bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1&

创建主题 bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主题 bin/kafka-topics.sh --list --zookeeperlocalhost:2181

查看主题详情 bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test

删除主题 bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181

创建生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

3.Flume和Kafka整合

3.1 两者整合优势

Flume更倾向于数据传输本身,Kakfa是典型的消息中间件用于解耦生产者消费者。

具体架构上,Agent并没把数据直接发送到Kafka,在Kafka前面有层由Flume构成的forward。这样做有两个原因:

Kafka的API对非JVM系的语言支持很不友好,forward对外提供更加通用的HTTP接口。

forward层可以做路由、Kafka topic和Kafkapartition key等逻辑,进一步减少Agent端的逻辑。

数据有数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算。本文实时计算采用SparkStreaming做测试。

3.2 Flume和Kafka整合安装

Flume和Kafka插件包下载:https://github.com/beyondj2ee/flumeng-kafka-plugin

提取插件中的flume-conf.properties文件:修改如下:flume源采用exec

producer.sources.s.type = exec

producer.sources.s.command=tail -f -n+1/usr/local/Hadoop/flume/test.log

producer.sources.s.channels = c

修改producer代理的topic为test

将配置放到flume/cong/producer.conf中

复制插件包中的jar包到flume/lib中:删除掉版本不同的相同jar包,这里需要删除scala-compiler-z.9.2.jar包,否则flume启动会出现问题。

复制kafka/libs中的jar包到flume/lib中。

完整producer.conf:

producer.conf:

#agentsection

producer.sources= s

producer.channels= c

producer.sinks= r

#sourcesection

producer.sources.s.type= exec

#producer.sources.s.spoolDir= /usr/local/hadoop/flume/logs

#producer.sources.s.fileHeader= true

producer.sources.s.command= tail -f -n+1 /usr/local/hadoop/flume/aaa.log

producer.sources.s.channels= c

# Eachsink's type must be defined

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=localhost:9092

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=test

#Specifythe channel the sink should use

producer.sinks.r.channel= c

# Eachchannel's type is defined.

producer.channels.c.type= memory

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100

3.3 启动kafka flume相关服务

启动ZKbin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka服务 bin/kafka-server-start.sh config/server.properties

创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

启动flume

flume-ng agent --conf./flume/conf/ -f ./flume/conf/producer.conf -Dflume.root.logger=DEBUG,console-n producer

向flume发送数据:

Kafka消费者数据:

3.4 Kafka和SparkStreaming整合

核心代码:

完整代码路径:

spark-1.4.0\examples\src\main\java\org\apache\spark\examples\streaming


执行参数:


发送数据:

由于flume采用exec数据源的方式,因此flume会监听配置的相应的文件: tail -f -n+1 /usr/local/Hadoop/flume/aaa.log

当向该文件追加文件时,flume就会获取追加的数据:

writetoflume.py

flume将获取的增量数据由sink发送给kafka,以下是kafka comsumer消费的数据

执行结果:

SparkStreaming订阅kafka的test主题的数据,将订阅的数据进行单词计数处理。

Flume+Kafka+SparkStreaming整合相关推荐

  1. flume+kafka+storm整合02---问题

    ###1.启动storm时总是报错 storm Session 0x0 for server null, unexpected error, closing socket connection ### ...

  2. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  3. 大数据集群搭建(12)——Flume和Kafka的整合

    Flume和Kafka的整合 1.配置flume,在flume的conf目录下新建文件(flume_kafka.conf)并配置.  ################################# ...

  4. Flume+Kafka+Strom基于伪分布式环境的结合使用

    --------------------------------------- 博文作者:迦壹 博客地址:Flume+Kafka+Strom基于伪分布式环境的结合使用 转载声明:可以转载, 但必须以超 ...

  5. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  6. 玩转Kafka—SpringGo整合Kafka

    玩转Kafka-Spring整合Kafka 1 新建Spring Boot项目,增加依赖 <dependencies><dependency><groupId>or ...

  7. Flume+Kafka双剑合璧玩转大数据平台日志采集

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...

  8. Flume+kafka+flink+es 构建大数据实时处理

    大数据目前的处理方法有两种:一种是离线处理,一种是实时处理.如何构建我们自己的实时数据处理系统我们选用flume+kafka+flink+es来作为我们实时数据处理工具.因此我们的架构是: flume ...

  9. flume+kafka消费数据【纯个人笔记】

    1.数据生产 使用java代码往一个文件中写入数据 package com.mobile;import java.io.*; import java.text.DecimalFormat; impor ...

最新文章

  1. Flutter Exception降到万分之几的秘密
  2. 运行 OpenCV-Python-Toturial-中文版 遇到的一些错误问题
  3. 对于写bash脚本的朋友,read命令是不可或缺的,需要实践一下就可以了解read命令的大致用途: 编写一个脚本: #!/bin/bash # hao32 test read echo -e Pl
  4. (GitHub标星6.9k)超详细的人工智能专家路线图,
  5. 如何选择开源许可证?(转)
  6. struts2学习笔记之十一:struts2的类型转换器
  7. ZH奶酪:Ionic中(弹出式窗口)的$ionicModal使用方法
  8. c++ linux 环境,C++ 环境设置
  9. 大家都在讲大数据,大数据是什么呢?
  10. 电脑全能工具箱,400+工具免费用
  11. 大数据基础概念思维导图
  12. xml.html一键解密工具,AutoJs一键解密
  13. Chrome快捷键整理
  14. terminate called after throwing an instance of ‘YAML::TypedBadConversion<int>‘ what(): bad conver
  15. React 合成事件
  16. 涉密计算机能用固态硬盘,电脑硬盘有坏道还能继续使用吗?不要抱侥幸心理,可能后悔莫及...
  17. 计算机术语我喜欢你,摩斯密码表白高级表白密码 摩斯密码我喜欢你数字
  18. 当我们谈论Unidbg时我们在谈什么
  19. 自连接、外连接和自连接查询
  20. Apple Watch耗电快?5个技巧让你的苹果手表超省电

热门文章

  1. 一招教你拿捏网上视频
  2. 如何制作俄罗斯方块(作业第一部分)
  3. 程序员需要了解的硬核知识之汇编语言(全)
  4. APS5840 DC-DC升降压恒流驱动IC 高效率 电流精度3%
  5. login和login local的区别
  6. 机械学习 - 数学基础 - 1
  7. 手写字符串识别的应用
  8. unsafe原理 java_Unsafe原理
  9. PHP设计模式之状态模式定义与用法详解
  10. c语言 查找数组元素