背景:Kafka消息总线的建成,使各个系统的数据得以在kafka节点中汇聚,接下来面临的任务是最大化数据的价值,让数据“慧”说话。

环境准备:

Kafka服务器*3。

CDH 5.8.3服务器*3,安装Flume,Solr,Hue,HDFS,Zookeeper服务。

Flume提供了可扩展的实时数据传输通道,Morphline提供了轻量级的ETL功能,SolrCloud+Hue提供了高性能搜索引擎和多样的数据展现形式。

12.20补充:(Hue的另外一种代替方式:Banana。)

2017.3.28补充:如果不使用CDH,而是使用开源的Flume+Solr=>纪录:Solr6.4.2+Flume1.7.0 +kafka集成

一.环境安装(略)

二.修改CDH默认配置:

1.在Flume配置界面配置Flume依赖Solr。

2.在Solr配置界面配置Solr使用Zookeeper存储配置文件,使用HDFS存储索引文件。

3.在Hue配置界面配置Hue依赖Solr

4.配置Hue界面可以被外网访问。

三.按场景配置各CDH服务及开发代码。

Kafka Topic: eventCount

Topic数据格式:

{"timestamp": "1481077173000","accountName": "旺小宝","tagNames": ["incoming"],"account": "WXB","eventType": "phone","eventTags": [{"value": 1,"name": "incoming"}]
}

1.Solr创建对应Collection。

1)登录任意CDH节点。生成collection配置文件骨架。

$ solrctl instancedir --generate $HOME/solr_configs

2)找到文件夹中的schema.xml文件,修改collection的schema。

第一步:修改field(先不要动type和dynamicField这些)。schema.xml中预定义了很多field,field对应的是json中需要被索引的字段。除了name=id,_root_,_version_不能去掉之外,其他的field可以去掉。

(Notice:json中的timestamp对应的是下面的eventTime,而下面的timestamp是flume接受kafka数据的时间。这是通过Morphline配置实现的转换)

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <!-- points to the root document of a block of nested documents. Required for nesteddocument support, may be removed otherwise--><field name="_root_" type="string" indexed="true" stored="false"/><field name="account" type="string" indexed="true" stored="true"/><field name="accountName" type="string" indexed="true" stored="true"/><field name="subaccount" type="string" indexed="true" stored="true"/><field name="subaccountName" type="string" indexed="true" stored="true"/><field name="eventTime" type="tlong" indexed="false" stored="true"/><field name="eventType" type="string" indexed="true" stored="true"/><field name="eventTags" type="string" indexed="true" stored="true" multiValued="true"/><field name="_attachment_body" type="string" indexed="false" stored="true"/><field name="timestamp" type="tlong" indexed="false" stored="true"/><field name="_version_" type="long" indexed="true" stored="true"/>

第二步:去掉所有copy field。

第三步:添加动态字段dynamicFiled。

<dynamicField name="tws_*" type="text_ws" indexed="true" stored="true" multiValued="true"/>

3) 上传配置,创建collection

$ solrctl instancedir --create event_count_records solr_configs
$ solrctl collection --create event_count_records -s 3 -c event_count_records

2.Flume配置

创建一个新的角色组kafka2solr,修改代理名称为kafka2solr,并为该角色组分配服务器。

# 配置 source  channel sink 的名字
kafka2solr.sources = source_from_kafka
kafka2solr.channels = mem_channel
kafka2solr.sinks = solrSink# 配置Source类别为kafka
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = mem_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = eventCount
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest#配置channel type为memory,通常生产环境中设置为file或者直接用kafka作为channel
kafka2solr.channels.mem_channel.type = memory
kafka2solr.channels.mem_channel.keep-alive = 60# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
kafka2solr.channels.mem_channel.capacity = 10000
kafka2solr.channels.mem_channel.transactionCapacity = 3000  # 配置sink到solr,并使用morphline转换数据
kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solrSink.channel = mem_channel
kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf
kafka2solr.sinks.solrSink.morphlineId=morphline1
kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true

3.Flume-NG的Solr接收器配置

SOLR_LOCATOR : {# Name of solr collectioncollection : event_count_records# ZooKeeper ensemble 
  #CDH的专有写法,开源版本不支持。zkHost : "$ZK_HOST"}morphlines : [{id : morphline1importCommands : ["org.kitesdk.**", "org.apache.solr.**"]commands : [
{#Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json
   readJson{}
}{#读出来的json字段必须转换成filed才能被solr索引到
extractJsonPaths {flatten:truepaths:{
account:/account
accountName:/accountName
subaccount:/subaccount
subaccountName:/subaccountName
eventTime:/timestamp
eventType:/eventType
eventTags:"/eventTags[]/name"
#按分钟存timestamp
eventTimeInMinute_tdt:/timestamp
#按小时存timestamp
eventTimeInHour_tdt:/timestamp
#按天存timestamp
eventTimeInDay_tdt:/timestamp
#_tdt后缀会被动态识别为日期类型的索引字段
#按不同时间间隔存索引以增加查询性能
}}
}#转换long型时间为Date格式
{convertTimestamp {field : eventTimeInMinute_tdtinputFormats : ["unixTimeInMillis"]inputTimezone : UTCoutputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"outputTimezone : Asia/Shanghai
}}{convertTimestamp {field : eventTimeInHour_tdtinputFormats : ["unixTimeInMillis"]inputTimezone : UTCoutputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"outputTimezone : Asia/Shanghai
}}
{convertTimestamp {field : eventTimeInDay_tdtinputFormats : ["unixTimeInMillis"]inputTimezone : UTCoutputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/DAY'"outputTimezone : Asia/Shanghai
}}#kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存
{toString { field : _attachment_body }}#为每一条记录生成一个UUID
{generateUUID {field : id
}}#对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。
          {  sanitizeUnknownSolrFields {  # Location from which to fetch Solr schema  solrLocator : ${SOLR_LOCATOR} renameToPrefix:"tws_"}  }  #将数据导入到solr中                {loadSolr {solrLocator : ${SOLR_LOCATOR}}}]}
]

重启被影响的Flume节点,数据开始导入solr。

3.通过Hue查询Solr中的数据。

见Solr+Hue实战。

(作者卡尔:http://www.cnblogs.com/arli/p/6158771.html )

转载于:https://www.cnblogs.com/arli/p/6158771.html

json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引相关推荐

  1. python接口自动化(十九)--Json 数据处理---实战(详解)

    简介 上一篇说了关于json数据处理,是为了断言方便,这篇就带各位小伙伴实战一下.首先捋一下思路,然后根据思路一步一步的去实现和实战,不要一开始就盲目的动手和无头苍蝇一样到处乱撞,撞得头破血流后而放弃 ...

  2. Kafka实战-Flume到Kafka

    1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面 ...

  3. kafka+flume 实时数据处理

    kafka+flume 实时数据处理 1.监测数据处理技术路线 ​ ​ ​ 1.1数据层 2.介绍技术 我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成? ​ ...

  4. 第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战

    第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战 flume 安装在集群的worker4上,地址192.168.189. ...

  5. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  6. flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...

    1.source为http模式,sink为logger模式,将数据在控制台打印出来. conf配置文件如下: # Name the components on this agent a1.source ...

  7. 中国移动实时数据分析-基于spark+kafka+flume

    这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zoo ...

  8. ArduinoJson天行数据平台json数据处理

    ArduinoJson天行数据平台json数据处理 这次实战对象是对天行数据API平台的老黄历(农历)json数据进行关键信息提取. 实战对象:天行数据老黄历API接口https://www.tian ...

  9. Flume实时采集mysql数据到kafka中并输出

    环境说明 centos7 flume1.9.0(flume-ng-sql-source插件版本1.5.3) jdk1.8 kafka 2.1.1 zookeeper(这个我用的kafka内置的zk) ...

最新文章

  1. 学习C#要养成的好习惯
  2. oracle中asm磁盘不足,Oracle用户无法访问ASM磁盘组问题
  3. JVM(3)——Java GC分析
  4. python编程小知识_分享Python开发中要注意的十个小贴士
  5. 解决xp登陆域很慢的方法
  6. 数据权限设计(原创)
  7. Android的Spinner控件解决默认选中第一条问题
  8. 当Github上下载的代码需要安装作者写的库时
  9. rabbitmq python 发送失败_python rabbitmq no_ack=false
  10. dispatcherServlet流程图
  11. 祝贺在龙芯平台上编译jogamp(gluegen/jogl)2.3.2通过,并运行成功
  12. 单片机c语言 教案,《单片机C语言》 课程教案.doc
  13. 在局域网搭建mqtt服务器
  14. 小程序利用云函数获取手机号码
  15. MFC程序版本自动升级更新
  16. 新浪sae php,PHP+新浪微博开放平台+新浪云平台(SAE)1
  17. Illustrator插件开发-AI插件-aip格式-第一章 第一小节 概述
  18. Web中间件常见安全漏洞
  19. linux mysql 超级用户_Linux下MySQL忘记超级用户口令的解决办法linux操作系统 -电脑资料...
  20. 一个人一个微博、一个App一个故事:通过微博草根账号做英语学习App的“爱卡微口语”获晨脉创投天使投资

热门文章

  1. HTC Vive使用WebVR的方法以及启用后头显无画面的解决方法
  2. 商城quot;抢茅台quot;脚本曝光,开源可用!
  3. 统计能量法的用武之地
  4. 360浏览器如何屏蔽某搜索网站的热搜
  5. 多元函数积分学中的利用轮换对称性积分
  6. VIO算法总结(一)
  7. 下面个人免费版的xshell
  8. 用matlab编写逢七必过游戏规则,数字图像处理及应用(MATLAB)第4章
  9. centos安装TDengine
  10. 最新版小程序砍价商城源码前后端