目录

一、数据采集/消费(存储)

二、数据采集

三、数据消费

四、编写代码

在project-ct.pom

在ct.consume下

在ct.consumer.bean

在ct.consumer.dao

在ct-consumer的resources

在ct-common.pom

在ct.common.api

在ct.common.bean

在ct.common.constant

在ct-common的resources

在ct-consumer-coprocessor

四、数据消费方案优化

打包jar

五、数据消费测试

1. 打包HBase消费者代码

2.在hbase的页面也看到文件出现


一、数据采集/消费(存储)

欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。

flume:cloudera公司研发

适合下游数据消费者不多的情况;

适合数据安全性要求不高的操作;

适合与Hadoop生态圈对接的操作。

kafka:linkedin公司研发

适合数据下游消费众多的情况;

适合数据安全性要求较高的操作(支持replication);

因此我们常用的一种模型是:

线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

消费存储模块流程如图2所示:

二、数据采集

思路:

a) 配置kafka,启动zookeeper和kafka集群;

b) 创建kafka主题;

c) 启动kafka控制台消费者(此消费者只用于测试使用);

d) 配置flume,监控日志文件;

e) 启动flume监控任务;

f) 运行日志生产脚本;

g) 观察测试。

1)启动zookeeper,kafka集群

$/opt/module/kafka/bin/kafka-server-start.sh
/opt/module/kafka/config/server.properties

2)创建kafka主题

[root@hadoop01 kafka]# bin/kafka-topics.sh --zookeeper hadoop01:2181 --topic ct --create --replication-factor 2 --partitions 3

检查一下是否创建主题成功:

[root@hadoop01 kafka]# bin/kafka-topics.sh --zookeeper hadoop01:2181 --list

3)启动kafka控制台消费者,等待flume信息的输入

[root@hadoop01 kafka]# bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 -topic ct

4)配置flume(flume-kafka.conf)

[root@hadoop01 data]# vim flume-2-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5)启动flume

[root@hadoop01 flume]# bin/flume-ng agent -c conf/ -n a1 -f /opt/module/data/flume-2-kafka.conf

6)有数据不断生产中

三、数据消费

如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。

思路:

a) 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;

b) 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;

c) 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。

四、编写代码

在project-ct.pom

<modules><module>ct-common</module><module>ct-producer</module><module>ct-consumer</module><module>ct-consumer-coprocessor</module></modules></project>

创建新的module项目:ct_consumer

pom.xml文件配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>lenovo-project-ct</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>ct-consumer</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.11.0.0</version></dependency><dependency><groupId>org.example</groupId><artifactId>ct-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>
</project>

在ct.consume下

创建Bootstrap


/*** @program: IntelliJ IDEA* @description: 启动消费者** @create: 2022-10-21 15:36*//*** 启动消费者** 使用kafka消费者获取Flume采集的数据** 将数据存储到Hbase中去*/
public class Bootstrap {public static void main(String[] args) throws Exception {//创建消费者Consumer consumer = new CalllogConsumer();//消费数据consumer.consume();//关闭资源consumer.close();}
}

在ct.consumer.bean

创建CalllogConsumer


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;/*** @program: IntelliJ IDEA* @description: 通话日志消费对象* @author: * @create: 2022-10-21 15:41*/
public class CalllogConsumer implements Consumer {/*** 消费数据*/public void consume() {try{//创建配置对象Properties prop = new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));//获取flume采集的数据KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);//关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));//HBase数据访问对象HBaseDao dao = new HBaseDao();//初始化dao.init();//消费数据while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(100);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());//插入数据dao.insertData(consumerRecord.value());
//                    Calllog log = new Calllog(consumerRecord.value());
//                    dao.insertData(log);}}}catch (Exception e){e.printStackTrace();}}/*** 关闭资源* @throws IOException*/@Overridepublic void close() throws IOException {}
}

创建 Calllog

/*** 通话日志*/
@TableRef("ct:calllog")
public class Calllog {@Rowkeyprivate String rowkey;@Column(family = "caller")private String call1;@Column(family = "caller")private String call2;@Column(family = "caller")private String calltime;@Column(family = "caller")private String duration;@Column(family = "caller")private String flg = "1";private String name;public String getFlg() {return flg;}public void setFlg(String flg) {this.flg = flg;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Calllog() {}public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Calllog(String data ) {String[] values = data.split("\t");call1 = values[0];call2 = values[1];calltime = values[2];duration = values[3];}public String getCall1() {return call1;}public void setCall1(String call1) {this.call1 = call1;}public String getCall2() {return call2;}public void setCall2(String call2) {this.call2 = call2;}public String getCalltime() {return calltime;}public void setCalltime(String calltime) {this.calltime = calltime;}public String getDuration() {return duration;}public void setDuration(String duration) {this.duration = duration;}
}

在ct.consumer.dao

创建HBaseDao


import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;import java.util.ArrayList;
import java.util.List;/*** @program: IntelliJ IDEA* @description: HBase数据访问对象* @author:* @create: 2022-10-21 16:41*/public class HBaseDao extends BaseDao {/*** 初始化*/public  void init() throws Exception{start();creatNamespaceNX(Names.NAMESPACE.getValue());createTableXX(Names.TABLE.getValue(),"com.lenovo.ct.consumer.coprocessor.InsertCalleeCoprocessor", ValueConstant.REGION_COUNT,Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());end();}/*** 插入对象* @param log* @throws Exception*/public void insertData( Calllog log ) throws Exception {log.setRowkey(genRegionNum(log.getCall1(), log.getCalltime()) + "_" + log.getCall1() + "_" + log.getCalltime() + "_" + log.getCall2() + "_" + log.getDuration());putData(log);}/*** 插入数据* @param value*/public void insertData(String value) throws Exception{//将通话日志保存到Hbase表中//1.获取通话日志数据String[] values = value.split("\t");String call1 = values[0];String call2 = values[1];String calltime = values[2];String duration = values[3];//2.创建数据对象// rowkey设计// 1)长度原则//      最大值64KB,推荐长度为10 ~ 100byte//      最好8的倍数,能短则短,rowkey如果太长会影响性能// 2)唯一原则 : rowkey应该具备唯一性// 3)散列原则//      3-1)盐值散列:不能使用时间戳直接作为rowkey//           在rowkey前增加随机数//      3-2)字符串反转 :1312312334342, 1312312334345//           电话号码:133 + 0123 + 4567//      3-3) 计算分区号:hashMap// rowkey = regionNum + call1 + time + call2 + durationString rowkey = genRegionNum(call1, calltime) + "_" + call1 + "_" + calltime + "_" + call2 + "_" + duration + "_1";// 主叫用户Put put = new Put(Bytes.toBytes(rowkey));byte[] family = Bytes.toBytes(Names.CF_CALLER.getValue());put.addColumn(family, Bytes.toBytes("call1"), Bytes.toBytes(call1));put.addColumn(family, Bytes.toBytes("call2"), Bytes.toBytes(call2));put.addColumn(family, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));put.addColumn(family, Bytes.toBytes("duration"), Bytes.toBytes(duration));put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));String calleeRowkey = genRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 被叫用户
//        Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
//        byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));//3.保存数据List<Put> puts = new ArrayList<Put>();puts.add(put);
//        puts.add(calleePut);putData(Names.TABLE.getValue(), puts);}
}

在ct-consumer的resources

创建consumer.properties

bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=org.example
enable.auto.commit=true
auto.commit.interval.ms=1000

在ct-common.pom

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency></dependencies>

在ct.common.api

创建Column

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {String family() default "info";String column() default  "";
}

创建Rowkey

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Rowkey {
}

创建TableRef

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TableRef {String value();
}

在ct.common.bean

创建Consumer


import java.io.Closeable;/*** @program: IntelliJ IDEA* @description: 消费者接口* * @create: 2022-10-21 15:39*/
public interface Consumer extends Closeable {/*** 生产数据* */public void consume();
}

创建BaseDao

/*** @program: IntelliJ IDEA* @description: 基础数据访问对象* @author: * @create: 2022-10-21 16:40*/import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;import java.lang.reflect.Field;
import java.util.*;public abstract class BaseDao  {private ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>();private ThreadLocal<Admin> adminHolder = new ThreadLocal<Admin>();protected void start() throws Exception {getConnection();getAdmin();}protected void end() throws Exception{Admin admin = getAdmin();if(admin != null){admin.close();adminHolder.remove();}Connection conn = getConnection();if(conn != null){conn.close();connHolder.remove();}}/*** 创建表,如果表已经存在,那么删除后在创建新的* @param name* @param families*/protected void createTableXX(String name ,String... families)throws Exception{createTableXX(name,null,null,families);}protected void createTableXX(String name ,String coprocessorClass,Integer regionCount,String... families)throws Exception{Admin admin = getAdmin();TableName tableName =TableName.valueOf(name);if (admin.tableExists(tableName)){//表存在,删除表deleteTable(name);}//创建表createTable(name,coprocessorClass,regionCount,families);}private void createTable(String name,String coprocessorClass,Integer regionCount,String... families) throws Exception{Admin admin = getAdmin();TableName tableName =TableName.valueOf(name);HTableDescriptor tableDescriptor =new HTableDescriptor(tableName);if ( families == null || families.length == 0){families = new String[1];families[0] = Names.CF_INFO.getValue();}for (String family : families) {HColumnDescriptor columnDescriptor =new HColumnDescriptor(family);tableDescriptor.addFamily(columnDescriptor);}if ( coprocessorClass != null && !"".equals(coprocessorClass) ) {tableDescriptor.addCoprocessor(coprocessorClass);}//增加预分区if(regionCount == null || regionCount <= 1){admin.createTable(tableDescriptor);}else{//分区键byte[][] splitKeys = getSplitKeys(regionCount);admin.createTable(tableDescriptor,splitKeys);}}/*** 获取查询时startrow, stoprow集合* @return*/protected  List<String[]> getStartStorRowkeys( String tel, String start, String end ) {List<String[]> rowkeyss = new ArrayList<String[]>();String startTime = start.substring(0, 6);String endTime = end.substring(0, 6);Calendar startCal = Calendar.getInstance();startCal.setTime(DateUtil.parse(startTime, "yyyyMM"));Calendar endCal = Calendar.getInstance();endCal.setTime(DateUtil.parse(endTime, "yyyyMM"));while (startCal.getTimeInMillis() <= endCal.getTimeInMillis()) {// 当前时间String nowTime = DateUtil.format(startCal.getTime(), "yyyyMM");int regionNum = genRegionNum(tel, nowTime);String startRow = regionNum + "_" + tel + "_" + nowTime;String stopRow = startRow + "|";String[] rowkeys = {startRow, stopRow};rowkeyss.add(rowkeys);// 月份+1startCal.add(Calendar.MONTH, 1);}return rowkeyss;}/*** 计算分区号(0, 1, 2)* @param tel* @param date* @return*/protected int genRegionNum( String tel, String date ) {// 13301234567String usercode = tel.substring(tel.length()-4);// 20181010120000String yearMonth = date.substring(0, 6);int userCodeHash = usercode.hashCode();int yearMonthHash = yearMonth.hashCode();// crc校验采用异或算法, hashint crc = Math.abs(userCodeHash ^ yearMonthHash);// 取模int regionNum = crc % ValueConstant.REGION_COUNT;return regionNum;}/*** 生成分区键* @param regionCount* @return*/private byte[][] getSplitKeys(int regionCount){int splitKeyCount = regionCount - 1;byte [][] bs = new byte[splitKeyCount][];// 0,1,2,3,4//(-无穷,0),[0,1),[1 ,+无穷)List<byte[]> bsList = new ArrayList<byte[]>();for( int i = 0;i <splitKeyCount;i++ ){String splitKey = i + "|";bsList.add(Bytes.toBytes(splitKey));}bsList.toArray(bs);return bs;}/*** 增加对象:自动封装数据,将对象数据直接保存到hbase中去* @param obj* @throws Exception*/protected void putData(Object obj) throws Exception {// 反射Class clazz = obj.getClass();TableRef tableRef = (TableRef)clazz.getAnnotation(TableRef.class);String tableName = tableRef.value();Field[] fs = clazz.getDeclaredFields();String stringRowkey = "";for (Field f : fs) {Rowkey rowkey = f.getAnnotation(Rowkey.class);if ( rowkey != null ) {f.setAccessible(true);stringRowkey = (String)f.get(obj);break;}}Connection conn = getConnection();Table table = conn.getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(stringRowkey));for (Field f : fs) {Column column = f.getAnnotation(Column.class);if (column != null) {String family = column.family();String colName = column.column();if ( colName == null || "".equals(colName) ) {colName = f.getName();}f.setAccessible(true);String value = (String)f.get(obj);put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));}}// 增加数据table.put(put);// 关闭表table.close();}/*** 增加多条数据* @param name* @param puts*/protected void putData( String name, List<Put> puts ) throws Exception {// 获取表对象Connection conn = getConnection();Table table = conn.getTable(TableName.valueOf(name));// 增加数据table.put(puts);// 关闭表table.close();}/*** 增加数据* @param name* @param put*/protected void putData(String name, Put put) throws Exception{//获取表的对象Connection conn = getConnection();Table table = conn.getTable(TableName.valueOf(name));//增加数据table.put(put);//关闭表table.close();}/*** 删除表格* @param name* @throws Exception*/protected void deleteTable(String name) throws Exception{TableName tableName =TableName.valueOf(name);Admin admin = getAdmin();admin.disableTable(tableName);admin.deleteTable(tableName);}/*** 创建命名空间,如果命名空间已经存在,不需要创建,否则,创建新的* @param namespace*/protected void creatNamespaceNX(String namespace) throws Exception{Admin admin = getAdmin();try{admin.getNamespaceDescriptor(namespace);}catch (NamespaceNotFoundException e){
//            e.printStackTrace();NamespaceDescriptor namespaceDescriptor =NamespaceDescriptor.create(namespace).build();admin.createNamespace(namespaceDescriptor);}}/*** 获取连接对象*/protected synchronized Connection getConnection() throws Exception {Connection conn = connHolder.get();if( conn == null ){Configuration conf = HBaseConfiguration.create();conn = ConnectionFactory.createConnection(conf);connHolder.set(conn);}return conn;}/*** 获取连接对象*/protected synchronized Admin getAdmin() throws Exception {Admin admin = adminHolder.get();if( admin == null ){admin = getConnection().getAdmin();adminHolder.set(admin);}return admin;}}

在ct.common.constant

创建ConfigConstant

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;/*** @program: IntelliJ IDEA* @description: ming* @author: * @create: 2022-10-22 20:21*/
public class ConfigConstant {private static Map<String,String> valueMap = new HashMap<String,String>();static {//国际化ResourceBundle ct = ResourceBundle.getBundle("ct");Enumeration<String> enums = ct.getKeys();while( enums.hasMoreElements()){String key = enums.nextElement();String value = ct.getString(key);valueMap.put(key,value);}}public static String getVal(String key){return valueMap.get(key);}public static void main(String[] args){System.out.println(ConfigConstant.getVal("ct.cf.caller"));}}

创建Names

/*
* 名称常量枚举类
* */
public enum Names implements Val {NAMESPACE("ct"),TABLE("ct:calllog"),CF_CALLER("caller"),CF_CALLEE("callee"),CF_INFO("info"), TOPIC("ct");private String name;private Names(String name){this.name = name;}@Overridepublic void setValue(Object val) {this.name = (String) val;}@Overridepublic String getValue() {return name;}
}

创建 constant

public class ValueConstant {public static final Integer REGION_COUNT = 6;
}

在ct-common的resources

将hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties放置于resources目录

创建hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. See accompanying LICENSE file.
--><!-- Put site-specific property overrides in this file. --><configuration><!-- 指定数据冗余份数 --><property><name>dfs.replication</name><value>1</value></property><!-- 关闭权限检查--><property><name>dfs.permissions.enable</name><value>false</value></property><property><name>dfs.namenode.secondary.http-address</name><value>hadoop03:50090</value></property><property><name>dfs.namenode.http-address</name><value>hadoop01:50070</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property>
</configuration>

创建core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. See accompanying LICENSE file.
--><!-- Put site-specific property overrides in this file. --><configuration><property><name>fs.defaultFS</name><value>hdfs://hadoop01:9000</value></property><property><name>hadoop.tmp.dir</name><value>/opt/module/hadoop-2.7.2/data/tmp</value></property><property><name>hadoop.proxyuser.admin.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.admin.groups</name><value>*</value></property></configuration>

创建hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
-->
<configuration><property><name>hbase.rootdir</name><value>hdfs://hadoop01:8020/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 --><property><name>hbase.master.port</name><value>60000</value></property><property><name>hbase.zookeeper.quorum</name><value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value></property><property><name>hbase.zookeeper.property.dataDir</name><value>/opt/module/zookeeper-3.4.10/zkData</value></property></configuration>

导入log4j.properties

创建ct.properties

ct.namespace = ct
ct.table = ct:calllog
ct.topic=ct
ct.cf.caller=caller
ct.cf.info = info

在ct-consumer-coprocessor

pom

<artifactId>ct-consumer-coprocessor</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.example</groupId><artifactId>ct-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>

四、数据消费方案优化

现在我们要使用

使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。

思路:

a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)

b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。

c) 重新创建hbase表,并设置为该表设置协处理器。

d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包

e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件

编码:

1) 新建协处理器类:InsertCalleeCoprocessor,并覆写postPut方法,该方法会在数据成功插入之后被回调。

创建InsertCalleeCoprocessor

package com.ct.consumer.coprocessor;import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;/**** 使用协处理器保存被叫用户的数据** 协处理器的使用* 1. 创建类* 2. 让表找到协处理类(和表有关联)* 3. 将项目打成jar包发布到hbase中(关联的jar包也需要发布),并且需要分发*/
public class InsertCalleeCoprocessor extends BaseRegionObserver {// 方法的命名规则// login// logout// prePut// doPut :模板方法设计模式//    存在父子类://    父类搭建算法的骨架//    1. tel取用户代码,2时间取年月,3,异或运算,4 hash散列//    子类重写算法的细节//    do1. tel取后4位,do2,201810, do3 ^, 4, % &// postPut/*** 保存主叫用户数据之后,由Hbase自动保存被叫用户数据* @param e* @param put* @param edit* @param durability* @throws IOException*/@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );// 关闭表table.close();}}private class CoprocessorDao extends BaseDao {public int getRegionNum(String tel, String time) {return genRegionNum(tel, time);}}
}

打包jar

1.打包jar的依赖到在linux下测试

把图中的jar导入到/opt/module/hbase/lib/

重新编译项目,发布jar包到hbase的lib目录下(注意需群发):

关闭hbase并且重新启动

bin/stop-hbase.sh
bin/start-hbase.sh

在hbase里面查看有文件生成

 bin/hbase shell
hbase(main):001:0> scan 'ct:calllog'

五、数据消费测试

项目成功后,则将项目打包后在linux中运行测试。

1. 打包HBase消费者代码

a) 在windows中,进入工程的pom.xml所在目录下(建议将该工程的pom.xml文件拷贝到其他临时目录中,例如我把pom.xml文件拷贝到了F:\maven-lib\目录下),然后使用mvn命令下载工程所有依赖的jar包

mvn -DoutputDirectory=./lib -DgroupId=com.atguigu -DartifactId=ct_consumer –

Dversion=0.0.1-SNAPSHOT dependency:copy-dependencies

b) 使用maven打包工程

c) 测试执行该jar包

方案一:推荐,使用*通配符,将所有依赖加入到classpath中,不可使用*.jar的方式。

注意:如果是在Linux中实行,注意文件夹之间的分隔符。自己的工程要单独在cp中指定,不要直接放在maven-lib/lib目录下。

java -cp F:\maven-lib\ct_consumer-0.0.1-SNAPSHOT.jar;F:\maven-lib\lib\*

com.atguigu.ct_consumer.kafka.HBaseConsumer

方案二:最最推荐,使用java.ext.dirs参数将所有依赖的目录添加进classpath中。

注意:-Djava.ext.dirs=属性后边的路径不能为”~”

java -Djava.ext.dirs=F:\maven-lib\lib\ -cp F:\maven-lib\ct_consumer-0.0.1-

SNAPSHOT.jar com.atguigu.ct_consumer.kafka.HBaseConsumer

2.在hbase的页面也看到文件出现

3.hbase节点挂了,可以查看日志文件,找出错误出现

大数据电信客服-数据采集/消费(二)相关推荐

  1. 大数据 电信客服项目

    1.项目背景 通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录, 第三方服务资费等等繁多信息.数据量如此巨大,除了要满足用户的实时查询和展示之外, 还需要定时定期的对已有数据进 ...

  2. 大数据电信客服-数据分析(三)

    目录 一.需求分析 二.Mysql表结构设计 三.环境准备 四.需求实现 ct.analysis ct.analysis.io ct.analysis.kv ct.analysis.mapper ct ...

  3. 大数据电信客服项目一——数据生成

    源码,资料自取:链接:https://pan.baidu.com/s/1jWcfux9vONLio5LboLSxSQ                    提取码:6666 说明:数据生成采用面向接口 ...

  4. 大数据项目之_15_电信客服分析平台_0102_项目背景+项目架构+项目实现+数据生产+数据采集/消费(存储)

    大数据项目之_15_电信客服分析平台_01&02 一.项目背景 二.项目架构 三.项目实现 3.1.数据生产 3.1.1.数据结构 3.1.2.编写代码 3.1.3.打包测试 3.2.数据采集 ...

  5. 实战▍一个完整的电信客服分析平台大数据项目:架构、实现、数据

    作者|黑泽明军  编辑|丹顶鹤5号 电信客服分析平台(附代码) 编者按: 很难见到这种一个完整的大数据项目,从项目背景.项目架构到项目实现都有大量的实例,包括数据存储和数据采集和各个模块的运行设置等等 ...

  6. 视频教程-新版全面系统完整的电信客服综合案例教程-大数据

    新版全面系统完整的电信客服综合案例教程 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰 ...

  7. 【大数据开发必看】项目一 电信客服

    电信客服 需求: 统计每天.每月以及每年的每个人的通话次数及时长 项目架构: 生产数据(ProduceLog) 随机生成电话号(主被叫) 随机生成通话建立时间 随机生成通话时长(30min内) 生成日 ...

  8. 精准大数据获客——移动 联通 电信运营商大数据分析_营销

    目前,移动.联通.电信三大运营商都在加速进行大数据能力建设.完善和丰富大数据的应用模式和基础架构.在大数据时代,企业的销售和营销渠道已由传统模式转为向大数据营销模式,大数据营销模式更顺应时代的变化和发 ...

  9. 2013华为工作之电信客服上线

    时间过得很快,不知觉,6个月时间已过,开发的项目终于要上线了,刚来的时候项目经理说这个项目是3个月之后要上线的.上线那一天,是我第一次为了工作而熬夜,就这样没了,第一次总让人那么深刻,让人那么难忘记. ...

最新文章

  1. C4D样条曲线建模大师班 Cinema 4D MasterClass: Master Modelling using Splines
  2. 含有“外骨骼”的电影和游戏
  3. Codeforces Round #431 (Div. 2)
  4. matlab实现图像放大两倍,matlab图像处理基础知识0(双线性插值matlab实现--调整水平和垂直放大倍数)...
  5. 深度学习 相机标定_基于深度学习的多传感器标定
  6. 目标检测——对数据进行EDA分析的学习笔记
  7. win7工作组无法查看计算机名,WIN7下无法查看工作组计算机怎么办
  8. 【NOIP2014】【Luogu2118】比例简化(枚举)
  9. Jmeter在Linux下的运行测试
  10. dota5显示正在连接协调服务器,win10系统打开dota2提示已连接至DOTA2游戏协调服务器正在登陆中如何解决...
  11. 10.5 Vue电商后台管理完善--订单详情页面显示商品信息,添加备注
  12. 创建相册,批量删除,图片预览,上传图片
  13. ipad iphone开发_如何在iPhone或iPad上删除电子邮件
  14. 数据结构课设----运动会分数统计系统
  15. SLAM学习——李群与李代数
  16. STM32学习笔记 | 引起电源和系统异常复位的原因
  17. 工业机器人在线示教编程和离线编程
  18. PDF转Word文字可编辑的软件有哪些?
  19. EarlyStopping技术
  20. 3.前端开发就业前景

热门文章

  1. 连续邮资问题-回溯法
  2. ajax开发 短期班,一个简单的ajax 来mock数据
  3. 从一线开发到技术总监,你就差一个赶鸭子上架
  4. mysql select详解_MySQL数据库 select 数据查询详解
  5. Windbg分析dump崩溃
  6. 关于CBB电容,独石电容,电解电容
  7. 转载--软件架构模式
  8. JAVA练习:创建圆柱体类Cylinder
  9. 开发中常见的架构模式
  10. linux添加loopback,教你CentOS7添加本地回环地址