1.Zookeeper集群配置

# hostname ip software notes
1 apollo.dt.com 192.168.56.181 zookeeper Kafka:
broker.id=181
2 artemis.dt.com 192.168.56.182 zookeeper kafka:
borker.id=182
3 uranus.dt.com 192.168.56.183 zookeeper kafka:
broker.id=183
4 pandora.dt.com 192.168.56.184 zookeeper kafka:
broker.id=184

有关zookeeper详细集群搭建请参考:CentOS安装配置Zookeeper集群

2.Kafka集群配置

# hostname ip software notes
1 apollo.dt.com 192.168.56.181 kafka Kafka:
broker.id=181
2 artemis.dt.com 192.168.56.182 kafka kafka:
borker.id=182
3 uranus.dt.com 192.168.56.183 kafka kafka:
broker.id=183
4 pandora.dt.com 192.168.56.184 kafka kafka:
broker.id=184

有关Kafka详细集群搭建请参考:CentOS7.0安装配置Kafka集群](http://blog.csdn.net/jssg_tzw/article/details/73106299)

3.启动zookeer和kafka集群

3.1.启动zookeeper

[root@apollo ~]# zkServer.sh start
[root@artemis ~]# zkServer.sh start
[root@uranus ~]# zkServer.sh start
[root@pandora ~]# zkServer.sh start

3.2.启动kafka集群

[root@apollo ~]# kafka-server-start.sh /opt/kafka/config/server.properties
[root@artemis ~]# kafka-server-start.sh /opt/kafka/config/server.properties
[root@uranus ~]# kafka-server-start.sh /opt/kafka/config/server.properties
[root@pandora ~]# kafka-server-start.sh /opt/kafka/config/server.properties

4.flume的安装与配置

4.1.把flume解压到/opt/flume目录下

4.2.在$FLUME_HOME/conf/目录下新建配置文件flume2kafka

# 定义agent
a1.sources=r1
a1.channels=c1
a1.sinks=k1# 定义source
a1.sources.r1.type=avro
a1.sources.r1.bind=localhost
a1.sources.r1.port=44446# 定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.channels.c1.keep-alive=30# 定义sink (Kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = dt-receipts
a1.sinks.k1.kafka.bootstrap.servers = 192.168.56.181:9092,192.168.56.182:9092,192.168.56.183:9092,192.168.56.184:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# 绑定source, sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.3.启动flume

[itlocals-MacBook-Pro:flume david.tian]$  bin/flume-ng agent -n a1 -c conf/ --conf-file conf/flume2kafka.conf -Dflume.root.logger=DEBUG,console

5. log4j发日志到flume

源码请从我的git上下载:https://github.com/david-louis-tian/dBD

5.1.这里仅给出pom.xml,模拟日志的代码,和log4j.properties

  • pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dvtn.www</groupId><artifactId>dBD</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>dBD</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.7.25</slf4j.version><log4j.version>1.2.17</log4j.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- Log Dependency 日志依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.json/json --><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20170516</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.avro/avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.8.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender --><dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.7.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc --><dependency><groupId>org.apache.avro</groupId><artifactId>avro-ipc</artifactId><version>1.8.2</version></dependency></dependencies>
</project>
  • log4j.properties
################### set log levels ###############
log4j.rootLogger = INFO,stdout,file,flume################### flume ########################
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.layout = org.apache.log4j.PatternLayout
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 44446################## stdout #######################
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold = INFO
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n################## file ##########################
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold = INFO
log4j.appender.file.File = /Users/david.tian/logs/tracker/tracker.log
log4j.appender.file.Append = true
log4j.appender.file.DatePattern = '.'yyyy-MM-dd
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
  • SendReceipts.java
package com.dvtn.www.log4j.jsonlog;import com.dvtn.www.log4j.logfile.LogProducer;
import com.dvtn.www.model.Area;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.*;
import java.util.*;/*** Created by david.tian on 08/09/2017.*/
public class SendReceipts {private static Logger LOG = LoggerFactory.getLogger(LogProducer.class);private static String path = SendReceipts.class.getResource("/").getPath();private static String areaJsonString;private static String city;private static String cityKey;private static String province;private static String provinceKey;private static int separator;private static String phonePrefix;//private static final Random rnd = new Random();private static String[] payers = {"Merchants", "Individuals"};private static String[] managers = {"david", "josen", "fab", "simon", "banana", "tom", "scott", "ekrn", "sunshine", "lily", "kudu", "scala", "spark", "flume", "storm", "kafka", "avro", "linux"};private static String[] terminalTypes = {"RNM", "CNM", "RNM", "GNM", "CNJ", "GNJ", "RNJ", "GNM", "CNM"};private static String[] stores = {"连锁店", "分营店", "工厂店", "会员店", "直销店"};private static String[] items = {"面包","酒","油","牛奶","蔬菜","猪肉","牛肉","羊肉","曲奇","手机","耳机","面粉","大米","糖","苹果","茶叶","书","植物","玩具","床","锅","牙膏","洗衣粉","酱油","金鱼","干货"};private static String[] itemsType ={"食物","酒水","饮料","日用品","电子","数码","娱乐","家俱"};public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {Random rnd = new Random();ProduceReceipts pr = new ProduceReceipts();areaJsonString = pr.readJSON(path, "area.json");String transactionID = System.currentTimeMillis() + ""+Math.round(Math.random() * 9000 + 1000);String transactionDate = System.currentTimeMillis() + "";String taxNumber = Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000);String invoiceId = System.currentTimeMillis() + "";String invoiceNumber = Math.round(Math.random() * 900000000 + 100000000) + "";String invoiceDate = System.currentTimeMillis() + "";List<Area> listArea = pr.listArea(areaJsonString);int idx = rnd.nextInt(listArea.size());String provinceID = listArea.get(idx).getProvinceID();String provinceName = listArea.get(idx).getProvinceName();String cityID = listArea.get(idx).getCityID();String cityName = listArea.get(idx).getCityName();String telephone = provinceID + "-" + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000);int managerSize = managers.length;String manger = managers[rnd.nextInt(managerSize)];int payerSize = payers.length;String payer = payers[rnd.nextInt(payerSize)];String operator = "OP" + Math.round(Math.random() * 90000 + 10000);int terminalTypeSize = terminalTypes.length;String terminalNumber = terminalTypes[rnd.nextInt(terminalTypeSize)] + Math.round(Math.random() * 90000 + 10000);String account = pr.StringReplaceWithStar(Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000));String tcNumber = Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + " " + Math.round(Math.random() * 9000 + 1000) + "";File file = new File(path + "receipts.avsc");String line = null;BufferedReader reader = null;try {reader = new BufferedReader(new FileReader(file));while ((line = reader.readLine()) != null) {// System.out.println("========>" + line);}reader.close();} catch (IOException e) {e.printStackTrace();} finally {if (reader != null) {try {reader.close();} catch (IOException e1) {}}}try {//获得整个SchemaSchema schema = new Schema.Parser().parse(new File(path + "receipts.avsc"));GenericRecord record = new GenericData.Record(schema);//获取schema中的字段int storesSize = stores.length;//获取店面的SchemaSchema.Field  storeField = schema.getField("store");Schema storeSchema =  storeField.schema();GenericRecord storeRecord = new GenericData.Record(storeSchema);String storeNumber = Math.round(Math.random() * 9000 + 1000) + "";String address = provinceName + cityName;String storeName = provinceName + cityName + stores[rnd.nextInt(storesSize)];storeRecord.put("store_number",storeNumber);storeRecord.put("store_name",storeName);storeRecord.put("address",address);int itemsSize = items.length;int itemsTypeSize = itemsType.length;List<GenericRecord> productRecordList = new ArrayList<GenericRecord>();//获取product的schemaSchema.Field productField = schema.getField("products");Schema productSchema = productField.schema();for (int i=0; i< 10; i++){String itemName = items[rnd.nextInt(1000)%itemsSize];String itemType = itemsType[rnd.nextInt(1000)%itemsTypeSize];String quantity = String.valueOf(rnd.nextInt(100));String price = String.valueOf(rnd.nextFloat()*100);String discount = String.valueOf(rnd.nextFloat());GenericRecord productRecord = new GenericData.Record(productSchema);productRecord.put("item",itemName);productRecord.put("item_type",itemType);productRecord.put("quantity",quantity);productRecord.put("price",price);productRecord.put("discount",discount);productRecordList.add(productRecord);}record.put("transaction_id",transactionID);record.put("transaction_date",transactionDate);record.put("invoice_id",invoiceId);record.put("invoice_number",invoiceNumber);record.put("telephone",telephone);record.put("payer",payer);record.put("store",storeRecord);record.put("operator",operator);record.put("terminal_number",terminalNumber);record.put("products",productRecordList);record.put("account",account);record.put("tc_number",terminalNumber);LOG.info(record.toString());} catch (IOException e) {e.printStackTrace();}}}, 0, 1000);}
}

6.验证

在kafka机器上执行命令kafka-console-consumer.sh读取topic “dt-receipts”中看是否日志已被kafka收集:

[root@apollo ~]# kafka-console-consumer.sh --bootstrap-server 192.168.56.181:9092,192.168.56.182:9092,192.168.56.183:9092,192.168.56.184:9092 --from-beginning --topic dt-receipts

我们可以看到,数据已经收集到kafka里的dt-receipts的topic里面:

log4j+flume+kafka实现日志收集相关推荐

  1. 基于Flume的美团日志收集系统(二)改进和优化

    在<基于Flume的美团日志收集系统(一)架构和设计>中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计.在本节中,我们将会讲述在实际部署和使用过程中遇到的问 ...

  2. 基于Flume的美团日志收集系统(一)架构和设计

    背景 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团 ...

  3. 基于Flume的美团日志收集系统-----架构和设计

    问题导读: 1.Flume-NG与Scribe对比,Flume-NG的优势在什么地方? 2.架构设计考虑需要考虑什么问题? 3.Agent死机该如何解决? 4.Collector死机是否会有影响? 5 ...

  4. 实战:kafka实现日志收集系统

    实战:kafka实现日志收集系统 一.Kafka案例需求概述 1.1 需要收集的信息: 用户唯一标识 用户操作时间 具体用户操作 1.2 工作流程: 用户的操作会由Web服务器进行响应. 同时用户的操 ...

  5. flume连接kafka_日志收集系统架构设计:(flume+zookeeper+kafka+php+mysql )

    正文内容 一.安装jdk 二.安装flume 三.安装kafka 1.zookeeper 2.kafka 四.启动测试步骤 五.目录说明 六.日志收集系统设计图 七.大数据参考资料推荐 一.安装jdk ...

  6. ELK+Kafka 企业日志收集平台(一)

    背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项:所以最近将Redis ...

  7. ELK+Kafka 企业日志收集平台(二)这是原版

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  8. ELK+Kafka 企业日志收集平台(二)

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  9. .NET Core使用NLog通过Kafka实现日志收集

    一.前言 NET Core越来越受欢迎,因为它具有在多个平台上运行的原始.NET Framework的强大功能.Kafka正迅速成为软件行业的标准消息传递技术.这篇文章简单介绍了如何使用.NET(Co ...

最新文章

  1. java 流的概念_举例讲解Java中的Stream流概念
  2. 笑出腹肌的注释,都是被代码耽误的诗人!
  3. Linux运维之批量下载指定网站的100个图片文件,并找出大于200KB的文件
  4. [OSDI 12] PoweGraph: 分布式图并行计算框架 学习总结
  5. 记录 之 在华为NPU上变更镜像
  6. 计算机网络实验(思科模拟器Cisco Packet Tracer)——路由器配置
  7. C语言的getopt
  8. python切片表达式3个参数_Python:Base3(函数,切片,迭代,列表生成式)
  9. JAVA-数据库之MySQL与JDBC驱动下载与安装
  10. Exception:A generic error occurred in GDI+
  11. 【大数据部落】R语言实现:混合正态分布EM最大期望估计法
  12. Linux卸载Apache服务器
  13. MATLAB符号函数的求导
  14. 静态HTML个人博客主页 简单个人网页设计作业 DW个人网站模板下载 大学生简单个人网页作品代码 个人网页制作 学生个人网页设计作业
  15. 小孔子内容管理系统V2.0正式开源发布
  16. FoxNFT创世品牌娘卡包预售6月15日正式开启!五位姑娘正式与大家见面
  17. 逆流而上:计算机专业图书破圈营销,这个直播间做到了
  18. 基于QT实现的旅游路线查询系统
  19. Android View绘制6 Draw过程(下)
  20. Excel 两列合并为一列中间加空格

热门文章

  1. 关于电子、通信专业学生、工程师学习方法分享
  2. 关于collect.stream().collect()方法的使用
  3. 【PL/SQL】连接PLSQL
  4. WPBeginner 6岁–回顾过去,展望未来
  5. Java-OO(Object-oriented面向对象)
  6. 为什么有人年入近100亿,有人却月薪1千:你忽视的细节,其实最挣钱
  7. 故事快讲不下去的互联网新秀们
  8. 6个座位办公室最佳位置_6个座位办公室最佳位置
  9. 苹果手机把游戏藏在计算机,七个一定要知道的Iphone隐藏小技巧,很实用,我知道的太晚了...
  10. python在哪下快播_moviepy音视频剪辑:使用fl_time进行诸如快播、慢播、倒序播放等时间特效处理的原理和可能遇到的坑...