一、准备环境
1、下载jdk-8u45-windows-x64.exe 安装于D:\Java8
2、修改JAVA_HOME为

D:\Java8\jdk1.8.0_45

3、修改HADOOP_HOME为

D:\hadoop272

4、搭建Hadoop102 Hadoop103 Hadoop104 三台centos6.5虚拟机
需修改hosts文件

192.168.198.134 Hadoop102
192.168.198.135 Hadoop103
192.168.198.136 Hadoop104

/etc/sysconfig/networks文件

NETWORKING=yes
HOSTNAME=Hadoop102

/etc/sysconfig/network-scripts/ifcfg-eth0文件

DEVICE="eth0"
BOOTPROTO="static"
NM_CONTROLLED="yes"
ONBOOT="yes"
TYPE="Ethernet"
UUID="2dc126cb-ef2a-412e-a373-45fbe1829354"
IPADDR=192.168.198.134
GATEWAY=192.168.198.2
NETMASK=255.255.255.0
DNS1=192.168.198.2
DNS2=114.114.114.114
DNS3=8.8.8.8

5、配置三台主机的登录秘钥
6、上传jdk1.8.0_45 至 /usr/java
7、上传hadoop272至 /opt/module2
8、配置hadoop 的6个文件
9、上传hive flume kafka zookeeper 至/opt/modules/ap 并解压
10、配置环境变量/etc/profile

export JAVA_HOME=/usr/java/jdk1.8.0_45
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/libexport HADOOP_HOME=/opt/modules/ap/hadoop272
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOOME/sbin:$HADOOP_HOME/lib:$HADOOP_HOME/lib/native
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export HADOOP_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop
export HDFS_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop
export YARN_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoopexport KAFKA_HOME=/opt/modules/ap/kafka_2.11-0.11.0.0
export PATH=$PATH:$KAFKA_HOME/bin:$KAFKA_HOME/libsexport ZOOKEEPER_HOME=/opt/modules/ap/zookeeper-3.4.10
export PATH=$PATH:$ZOOKEEPER_HOME/binexport HIVE_HOME=/opt/modules/ap/apache-hive-1.2.1-bin
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/libexport FLUME_HOME=/opt/modules/ap/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/bin

11、安装配置zookeeper、hive、kafka、flume详见本人相关文章以下仅写出由于版本问题需要改动或补充的部分

12、此处由于hive版本较高,需要执行

schematool -dbType mysql -initSchema

命令手动初始化mysql数据库

否则会报错

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

13、另外 配置metaserver后 使用hive前先后台启动metaserver

hive --service metastore &

14、win10安装idea
下载idea 2019.1
15、修改c:\Windows\System32\drivers\etc\hosts文件,添加

 0.0.0.0 account.jetbrains.com0.0.0.0 www.jetbrains.com

16、下载JetbrainsIdesCrack 4.2 jar
放到idea 2019的安装目录 bin下
修改 idea.exe.vmoptions和 idea64.exe.vmoptions
在末尾添加包的路径

-javaagent:D:\Javasoft\IntelliJ IDEA 2019.1.3\bin\jetbrains-agent.jar

17、打开idea2019,在activatecode 输入下载的注册码
18、打开D:\idea project\app_statistics_project 项目

二、项目概要
统计 手机app 的活跃度
关键词:开发商(租户)、app 、SDK、用户、活跃用户、沉默用户、忠诚用户、流失用户、留存用户、用户新鲜度、启动次数、单次使用时长、日使用时长
功能模块:
1)实现收集手机APP日志
2)定期离线分析业务指标
3)数据展示
流程:
1)手机APP启动时,上报启动日志、错误日志、页面日志、事件日志、使用时长日志等信息到日志收集服务器。
2)日志收集服务器将收集到的日志信息发送给kafka。
3)Flume分别消费kafka中的5种主题信息,并把数据存储到HDFS上。
4)通过crontab任务调度定时把HDFS中的信息拷贝到Hive数据仓库中。
5)核心业务操作采用Hive查询。
6)查询结果通过数据展示平台展示。
三、项目工程结构
项目:app_statistics_project
工程1:app_logs_common 提供实体类和地理信息的获取
–AppBaseLog 日志消息基类
创建时间、应用标识、租户标识、设备标识、应用版本、安装渠道、操作系统、系统版本、机型
–AppErrorLog 错误日志
错误摘要、错误详情
–AppEventLog 事件日志
事件标识、持续时长、参数(键值)
–AppPageLog 页面日志
页面标识、访问顺序、下个页面、停留时长
–AppStartupLog 启动日志
国家、省份、ip地址、网络运营商、品牌、分辨率
–AppUsageLog 时长日志
单次使用时长、单次上传流量、单次下载流量
–AppLogEntity 日志集合
private AppStartupLog[] appStartupLogs;
private AppPageLog[] appPageLogs;
private AppEventLog[] appEventLogs;
private AppUsageLog[] appUsageLogs;
private AppErrorLog[] appErrorLogs;
–GeoInfo 地理信息
country 、province
–GeoUtil 地理信息工具
加载国家数据
public static String getCountry(String ip)
加载省份数据
public static String getProvince(String ip)

工程2、app_logs_client 手机客户端工程,模拟生日生成
–GenerateData 数据生成类
日志创建时间、应用唯一标识、租户唯一标识、设备唯一标识、版本、渠道、操作系统、系统版本、机型

          日志信息初始值或随机值定义
    private static Long[] createdAtMsS = initCreatedAtMs();//日志创建时间private static String appId = "sdk34734";//应用唯一标识private static String[] tenantIds = {"cake"};//租户唯一标识,企业用户private static String[] deviceIds = initDeviceId();//设备唯一标识private static String[] appVersions = {"3.2.1", "3.2.2"};//版本private static String[] appChannels = {"youmeng1", "youmeng2"};//渠道,安装时就在清单中制定了,appStore等。private static String[] appPlatforms = {"android", "ios"};//平台private static String[] osTypes = {"8.3", "7.1.1"};//操作系统private static String[] deviceStyles = {"iPhone 6", "iPhone 6 Plus", "红米手机1s"};//机型
     //初始化设备idprivate static String[] initDeviceId()//初始化创建时间private static Long[] initCreatedAtMs()
//启动日志属性值private static String[] countrys = {"America", "china"};//国家,终端不用上报,服务器自动填充该属性private static String[] provinces = {"Washington", "jiangxi", "beijing"};//省份,终端不用上报,服务器自动填充该属性private static String[] networks = {"WiFi", "CellNetwork"};//网络private static String[] carriers = {"中国移动", "中国电信", "EE"};//运营商private static String[] brands = {"三星", "华为", "Apple", "魅族", "小米", "锤子"};//品牌private static String[] screenSizes = {"1136*640", "960*640", "480*320"};//分辨率
//事件日志属性值private static String[] eventIds = {"popMenu", "autoImport", "BookStore"}; //事件唯一标识private static Long[] eventDurationSecsS = {new Long(25), new Long(67), new Long(45)};//事件持续时长static Map<String, String> map1 = new HashMap<String, String>() {{put("testparam1key", "testparam1value");put("testparam2key", "testparam2value");}};static Map<String, String> map2 = new HashMap<String, String>() {{put("testparam3key", "testparam3value");put("testparam4key", "testparam4value");}};private static Map[] paramKeyValueMapsS = {map1, map2};//参数名/值对
//使用时长日志属性值private static Long[] singleUseDurationSecsS = initSingleUseDurationSecs();//单次使用时长(秒数),指一次启动内应用在前台的持续时长// 单次使用时长private static Long[] initSingleUseDurationSecs()
// 错误日志属性值private static String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"};        //错误摘要private static String[] errorDetails = {"java.lang.NullPointerException\\n    " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"};        //错误详情
// 页面使用情况日志属性值private static String[] pageIds = {"list.html", "main.html", "test.html"};//页面idprivate static int[] visitIndexs = {0, 1, 2, 3, 4};//访问顺序号,0为第一个页面private static String[] nextPages = {"list.html", "main.html", "test.html", null}; //下一个访问页面,如为空则表示为退出应用的页面private static Long[] stayDurationSecsS = {new Long(45), new Long(2), new Long(78)};//当前页面停留时长
       生成模拟数据数组
    // 初始化五类log的数据//启动相关信息的数组private static AppStartupLog[] appStartupLogs = initAppStartupLogs();//页面跳转相关信息的数组private static AppPageLog[] appPageLogs = initAppPageLogs();//事件相关信息的数组private static AppEventLog[] appEventLogs = initAppEventLogs();//app使用情况相关信息的数组private static AppUsageLog[] appUsageLogs = initAppUsageLogs();//错误相关信息的数组private static AppErrorLog[] appErrorLogs = initAppErrorLogs();
// 初始化每类log的公共属性值private static void initLogCommon(AppBaseLog baselog){// 日志创建时间baselog.setCreatedAtMs(System.currentTimeMillis());// appidbaselog.setAppId(appId);// 租户唯一标识,企业用户String tenantId = tenantIds[random.nextInt(tenantIds.length)];if (tenantId != null) {baselog.setTenantId(tenantId);}baselog.setTenantId(tenantIds[random.nextInt(tenantIds.length)]);// 设备唯一标识baselog.setDeviceId(deviceIds[random.nextInt(deviceIds.length)]);// 版本baselog.setAppVersion(appVersions[random.nextInt(appVersions.length)]);// 渠道baselog.setAppChannel(appChannels[random.nextInt(appChannels.length)]);// 平台baselog.setAppPlatform(appPlatforms[random.nextInt(appPlatforms.length)]);// 操作系统baselog.setOsType(osTypes[random.nextInt(osTypes.length)]);// 机型baselog.setDeviceStyle(deviceStyles[random.nextInt(deviceStyles.length)]);}
   各类日志的init函数
    // 启动相关信息的数组private static AppStartupLog[] initAppStartupLogs()// 页面相关信息的数组private static AppPageLog[] initAppPageLogs()// 事件相关信息的数组private static AppEventLog[] initAppEventLogs()// app使用情况相关信息的数组private static AppUsageLog[] initAppUsageLogs() // 错误相关信息的数组private static AppErrorLog[] initAppErrorLogs()
     发送数据
// 循环发送数据public static void main(String[] args)

–UploadUtil 日志上传工具类

//上传日志
public static void upload(String json)URL url = new URL("http://hadoop102:8080/app_logs/coll/index");

工程3、app_logs_collect_web 网站端 spring mvc网站,浏览数据

–日志收集类
@Controller
@RequestMapping("/coll")
public class CollectLogController

 //接收发送过来的数据public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req)
// 修正时间
private void verifyTime(AppLogEntity e, HttpServletRequest req)
//修正ip client地址
private void processIp(AppLogEntity e, HttpServletRequest req)
// 缓存地址信息
private Map<String, GeoInfo> cache = new HashMap<String, GeoInfo>();

设置kafka主题

public class Constants {//主题public static final String TOPIC_APP_STARTUP = "topic_app_startup" ;public static final String TOPIC_APP_ERRROR = "topic_app_error" ;public static final String TOPIC_APP_EVENT = "topic_app_event" ;public static final String TOPIC_APP_USAGE = "topic_app_usage" ;public static final String TOPIC_APP_PAGE = "topic_app_page" ;
}
// 发送消息给发Kafka
private void sendMessage(AppLogEntity e)
//发送单个的log消息给kafka
private void sendSingleLog(KafkaProducer<String, String> producer, String topic, AppBaseLog[] logs)

工程4、app_logs_flume 创建Flume拦截器,区分kafka传递过来的日志类型

类:public class LogCollInterceptor implements Interceptor

   //将flume事件解析成五个类别public Event intercept(Event event)

配置Flume

a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hxzy.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_pagea1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStreama1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

工程5、HIVE数据处理

分区表
tab_name
ext_error_logs
ext_event_logs
ext_page_logs
ext_startup_logs
ext_usage_logs

类:DateUtil

java date 处理函数

//根据输入的时间信息,返回当天的起始时间;
public static Date getDayBeginTime(Date d)
public static Date getDayBeginTime(Date d, int offset)
//根据输入的时间信息,返回本周的起始时间;
public static Date getWeekBeginTime(Date d)
public static Date getWeekBeginTime(Date d, int offset)
//根据输入的时间信息,返回本月的起始时间;
public static Date getMonthBeginTime(Date d)
public static Date getMonthBeginTime(Date d, int offset)

UDF函数
类:
class DayBeginUDF extends UDF

     // 计算现在的起始时刻(毫秒数)public long evaluate() throws ParseException {return evaluate(new Date());}
   // 指定天偏移量public long evaluate(int offset) throws ParseException {return evaluate(DateUtil.getDayBeginTime(new Date(), offset));}
   // 计算某天的起始时刻,日期类型(毫秒数)public long evaluate(Date d) throws ParseException {return DateUtil.getDayBeginTime(d).getTime();}
   // 计算某天的起始时刻,日期类型,带偏移量(毫秒数)public long evaluate(Date d, int offset) throws ParseException {return DateUtil.getDayBeginTime(d, offset).getTime();}
   // 计算某天的起始时刻,String类型(毫秒数)public long evaluate(String dateStr) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");Date d = sdf.parse(dateStr);return evaluate(d);}
   // 计算某天的起始时刻,String类型,带偏移量(毫秒数)public long evaluate(String dateStr, int offset) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d, offset).getTime();}
   // 计算某天的起始时刻,String类型,带格式化要求(毫秒数)public long evaluate(String dateStr, String fmt) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat(fmt);Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d).getTime();}
   // 计算某天的起始时刻,String类型,带格式化,带偏移量(毫秒数)public long evaluate(String dateStr, String fmt, int offset) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat(fmt);Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d, offset).getTime();}
}

类:public class WeekBeginUDF extends UDF
类:public class MonthBeginUDF extends UDF
类:public class FormatTimeUDF extends UDF

SQL:
新增用户统计:
日新增用户

select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;

月新增用户

select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getmonthbegin() and mintime < getmonthbegin(1)
)t ;

日活跃用户数

select
count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getdaybegin() and createdatms < getdaybegin(1);

一周内,每天的日活跃数

select
formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getweekbegin() and createdatms < getweekbegin(1)
group by formattime(createdatms,'yyyy/MM/dd');

连续3周活跃用户

select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where appid = 'sdk34734'
and concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd')
group by deviceid

沉默用户数

select
count(*)
from
(select deviceid , count(createdatms) dcount,min(createdatms) dmin
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having dcount = 1 and dmin < getdaybegin(-1)
)t;

工程6、web可视化

配置spring mvc beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-4.3.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.3.xsd"><!-- 扫描service包 --><context:component-scan base-package="com.hxzy.applogs.visualize.service" /><!-- 连接hive数据源 --><bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"><property name="driverClass" value="org.apache.hive.jdbc.HiveDriver" /><property name="jdbcUrl" value="jdbc:hive2://192.168.1.103:10000/applogsdb" /><property name="user" value="hxzy" /><property name="password" value="" /></bean><bean id="sqlSessionFactoryBean" class="org.mybatis.spring.SqlSessionFactoryBean"><property name="dataSource" ref="dataSource" /><property name="configLocation" value="classpath:mybatis-config.xml" /></bean><bean id="statMapper" class="org.mybatis.spring.mapper.MapperFactoryBean"><property name="mapperInterface" value="com.hxzy.applogs.visualize.dao.StatMapper"></property><property name="sqlSessionFactory" ref="sqlSessionFactoryBean"></property></bean>
</beans>

配置 mybetis

mybatis-config.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configurationPUBLIC "-//mybatis.org//DTD Config 3.0//EN""http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration><mappers><mapper resource="StatMapper.xml" /></mappers>
</configuration>

StatMapper.xml

<mapper namespace="com.hxzy.applogs.visualize.dao.StatMapper"><!-- 查询新增用户 --><select id="findNewUsers" resultMap="rm_StatBean">select count(*) stcount from ext_startup_logs</select><resultMap id="rm_StatBean" type="com.hxzy.applogs.visualize.domain.StatBean"><result column="stcount" property="count" /></resultMap><select id="findThisWeekNewUsers" resultMap="rm_weekUser">select formattime(t.mintime,'yyyy/MM/dd') stdate , count(*) stcountfrom (select deviceid,min(createdatms) mintimefrom ext_startup_logswhere appid = #{appid} group by deviceid having mintime &gt;= getweekbegin() and mintime &lt; getweekbegin(1)) tgroup by formattime(t.mintime,'yyyy/MM/dd')</select><resultMap id="rm_weekUser" type="com.hxzy.applogs.visualize.domain.StatBean"><result column="stcount" property="count" /><result column="stdate" property="date" /></resultMap>
</mapper>

类:StatController

//统计每周每天新增用户数public Map<String, Object> stat3()

类2:统计服务

@Service("statService")
public class StatServiceImpl implements StatService

三、调试client、collect_web、flume拦截器3个工程

1、app_log_collect_web
调试问题1:

    未找到包  xxxx

解决:pom.xml 为正确配置 app_logs_commen模块,名称错误

调试问题2:

Error:(31, 46) java: 找不到符号符号:   类 AppLogEntity位置: 类 com.atguigu.applogs.collect.web.controller.CollectLogController

解决:依据文档添加该类

2、app_log_client --> generatedata
调试问题1:

Error:java: Compilation failed: internal java compiler error

解决:将项目和工程的language level 统一改为 7 diamonds

调试问题2:找不到方法setSingleUseDurationSecs
解决:依据文档将缺失的方法添加到项目中

调试问题3:

java.net.ConnectException: Connection timed out: connect

未能连接网络服务器
解决:将目标链接改为本地

URL url = new URL("http://localhost:8080/coll/index");// 测试地址

调试问题4:
接收到的响应码始终未404
调试方法:
在CollectLogController 的 collect action中 添加参数接收的控制台输出

System.out.println("logEntity.setAppStartupLogs");

在GenerateData 的 main 方法中添加控制台输出对方法执行实施监控

System.out.println("logEntity.setAppStartupLogs");
AppStartupLog[] a = new AppStartupLog[]{appStartupLogs[random.nextInt(appStartupLogs.length)]};
logEntity.setAppStartupLogs(a);
System.out.println(a[0].getCountry());

发送json

{"appErrorLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","createdAtMs":1597834893609,"deviceId":"device2265","deviceStyle":"红米手机1s","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)","errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","osType":"8.3","tenantId":"cake"}],"appEventLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device2222","deviceStyle":"红米手机1s","eventDurationSecs":45,"eventId":"autoImport","osType":"8.3","paramKeyValueMap":{"testparam3key":"testparam3value","testparam4key":"testparam4value"},"tenantId":"cake"}],"appPageLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.1","createdAtMs":1597834893608,"deviceId":"device2248","deviceStyle":"红米手机1s","nextPage":"list.html","osType":"8.3","pageId":"main.html","stayDurationSecs":45,"tenantId":"cake","visitIndex":2}],"appStartupLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","brand":"小米","carrier":"EE","country":"America","createdAtMs":1597834893608,"deviceId":"device2274","deviceStyle":"iPhone 6 Plus","network":"WiFi","osType":"7.1.1","province":"Washington","screenSize":"480*320","tenantId":"cake"}],"appUsageLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device224","deviceStyle":"iPhone 6 Plus","osType":"7.1.1","singleUseDurationSecs":93,"tenantId":"cake"}]}
System.out.println(json);

解决:数据生成正常,action端接收不到
1、将url改为:

http://localhost:8080/app_logs_collect_web_war_exploded/coll/index

通过控制台输出发现,程序在向kafka发送数据时,发生异常,由于虚拟机服务器未启动,因此情况正常,可以暂时注释掉向kafka发送数据的步骤

//sendMessage(e);


app_logs_client 工程 调试完成。

3、配置kafka集群的server.properties

delete.topic.enable=true
broker.id=0
log.dirs=/opt/modules/ap/kafka_2.11-0.11.0.0/logs
zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181
num.recovery.threads.per.data.dir=1
num.partitions=1
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
num.io.threads=8
num.network.threads=3

在三台服务器依次启动kafka服务器

/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /opt/modules/ap/kafka_2.11-0.11.0.0/config/server.properties &

创建主题

/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_startup
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_error
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_event
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_usage
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_page

创建控制台 topic_app_startup主题 消费者

/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper Hadoop102:2181 --topic topic_app_startup

问题1:启动新的consumer后,发生异常
问题解决:未配置consumer.properties

zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181

问题2:controller消息发送失败
解决:未配置producer.properties

bootstrap.servers=Hadoop102:9092,Hadoop103:9092,Hadoop104:9092

问题3:使用kafka-server-stop.sh 脚本关闭kafka失败
解决:编辑此脚本,指定kafka版本,“kafka.kafka” =>“kafka_2.11-0.11.0.0”

修改三个问题后,consumer接收app_logs_start消息正常

4、将调试后的app_logs_collect_web 打成war包 部署到 hadoop102的tomcat
检查是否pom文件中包含war属性
—打包之前首先clean项目

问题1:app_logs_common.jar找不到
解决:首先打包app_logs_common jar包,并且执行install操作
问题2:could not copy GeoLite2-City.mmdb 。。。请求的操作无法在使用用户映射区域打开的文件上执行
解决:打包前需要停止正在运行的项目,关闭后打包成功

将war包复制到Hadoop102的/usr/tomcat/webapps目录下

开启tomcat /usr/tomcat/bin/

修改app_logs_client工程中的UploadUtil中的请求地址为

http://hadoop102:8080/app_logs_collect_web/coll/index

5、flume接收kafka5个主题转存hdfs,并使用拦截器区分存储路径
ps: 此处也可使不使用拦截器而开动5个flume-agent分别接收5个不同主题但为节省维护成本,可以创建flume拦截器LogCollInterceptor类,用于在Event header中增加logtype键值,主要实现intercept函数如下:

public class LogCollInterceptor implements Interceptor {.../*** Modifies events in-place.*/public Event intercept(Event event) {// 1获取flume接收消息头Map<String, String> headers = event.getHeaders();// 2获取flume接收的json数据数组byte[] json = event.getBody();// 将json数组转换为字符串String jsonStr = new String(json);// pageLogString logType = "" ;if(jsonStr.contains("pageId")){logType = "page" ;}// eventLogelse if (jsonStr.contains("eventId")) {logType = "event";}// usageLogelse if (jsonStr.contains("singleUseDurationSecs")) {logType = "usage";}// errorelse if (jsonStr.contains("errorBrief")) {logType = "error";}// startupelse if (jsonStr.contains("network")) {logType = "startup";}// 3将日志类型存储到flume头中headers.put("logType", logType);// 简易调试: 在窗口输出json和logTypeSystem.out.println("json:"+json.toString());System.out.println("logType:"+logType);return event;}}

打包app_logs_flume工程为app_logs_flume_1.0.jar,拷贝到
/opt/modules/ap/apache-flume-1.7.0-bin/lib文件夹下面作为拦截器class

配置 flume-applog-conf.properties 配置kafka数据源、拦截器

a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_pagea1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/usr/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStreama1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

启动flume agent脚本

/opt/modules/ap/apache-flume-1.8.0-bin/bin/flume-ng agent  -f /opt/modules/ap/apache-flume-1.8.0-bin/conf/flume-applog-conf.properties -n a1




四、调试hive、visulize两个项目

1、修改hive配置,支持json,压缩模式改为不压缩
(1)将json-serde-1.3.8-jar-with-dependencies.jar导入到hive的/opt/module/hive/lib
(2)在/opt/module/hive/conf/hive-site.xml文件中添加如下配置

 <property><name>hive.aux.jars.path</name><value>/opt/modules/ap/apache-hive-1.2.1-bin/lib/json-serde-1.3.8-jar-with-dependencies.jar</value></property> <property><name>hive.exec.compress.output</name><value>false</value></property>

2、创建applogs_db数据库

drop database applogs_db;
create database applogsdb;
use applogsdb;

3、创建外部分区表

--startup
CREATE external TABLE ext_startup_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,country string,province string,ipAddress string,network string,carrier string,brand string,screenSize string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--error
CREATE external TABLE ext_error_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,errorBrief string,errorDetail string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--event
CREATE external TABLE ext_event_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,eventId string,eventDurationSecs bigint,paramKeyValueMap Map<string,string>)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--page
CREATE external TABLE ext_page_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,pageViewCntInSession int,pageId string,visitIndex int,nextPage string,stayDurationSecs bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--usage
CREATE external TABLE ext_usage_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,singleUseDurationSecs bigint,singleUploadTraffic bigint,singleDownloadTraffic bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;

4、编程hive定时脚本 hdfstohivebyminute.sh ,每隔一分钟将HDFS上的数据,导入到Hive对应分区一次

#!/bin/bash
systime=`date -d "-3 minute" +%Y%m-%d-%H%M`
ym=`echo ${systime} | awk -F '-' '{print $1}'`
day=`echo ${systime} | awk -F '-' '{print $2}'`
hm=`echo ${systime} | awk -F '-' '{print $3}'`
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/startup/${ym}/${day}/${hm}' into table applogsdb.ext_startup_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/error/${ym}/${day}/${hm}' into table applogsdb.ext_error_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/event/${ym}/${day}/${hm}' into table applogsdb.ext_event_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/usage/${ym}/${day}/${hm}' into table applogsdb.ext_usage_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/page/${ym}/${day}/${hm}' into table applogsdb.ext_page_logs partition(ym='${ym}',day='${day}',hm='${hm}')"

将文件格式化为UNIX格式

yum install -y dos2unix
dos2unix /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohivebyminute.sh

编写Linux调度列表 /etc/crontab 添加

* * * * * source /etc/profile; /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohive.sh
chmod -R 777 /opt/modules/ap/apache-hive-1.2.1-bin/source /etc/crontab
service crond start

查看是否有数据进入hive

use applogsdb;
select * from ext_startup_logs;


5、编写udf函数

6、导出udf函数 jar包 app_log_hive.jar 到hive/lib

在当前进程中添加jar包

hive>add jar /opt/modules/ap/apache-hive-1.2.1-bin/lib/app_logs_hive.jar;

在 hive-site.xml中添加jar包

<property><name>hive.aux.jars.path</name><value>file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
<property><name>hive.aux.jars.path</name><value>file:///opt/module/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar,file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>

添加函数

create function getdaybegin AS 'com.atguigu.hive.DayBeginUDF';
create function getweekbegin AS 'com.atguigu.hive.WeekBeginUDF';
create function getmonthbegin AS 'com.atguigu.hive.MonthBeginUDF';
create function formattime AS 'com.atguigu.hive.FormatTimeUDF';

7、测试今日新增、昨日新增等统计指标sql

select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;

。。。。。。。指标sql略

8、启动hiveserver2

/opt/modules/ap/apache-hive-1.2.1-bin/bin/hiveserver2 &

9、启动visulise_web工程

大数据项目实践--手机日志分析相关推荐

  1. 大数据时代的全能日志分析专家--Splunk安装与实践

    大数据时代的全能日志分析专家 --Splunk安装与实践 0.背  景 随着大家对网络安全意识的提高,企业网管理人员,必须对IT基础设置进行监控及安全事件的管理,管理数据的数量和种类非常巨大,那么就需 ...

  2. 大数据项目实践过程笔记

    开发工具intelijidea 2.19.3 目前围绕Hadoop体系的大数据架构包括: 传统大数据架构 数据分析的业务没有发生任何变化,但是因为数据量.性能等问题导致系统无法正常使用,需要进行升级改 ...

  3. 大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

    一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS ...

  4. 大数据项目实践:基于hadoop+spark+mongodb+mysql+c#开发医院临床知识库系统

    从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存 ...

  5. 基于大数据电商平台日志分析

    一.项目介绍 1.1 项目介绍 本次实训,要求使用Hadoop及其生态圈相关的组件来实现企业级大数据开发的整套流程,即数据的采集.数据的存储.数据的分析处理及数据的可视化.其中数据的采集部分会介绍两种 ...

  6. 基于大数据审计的信息安全日志分析法

    大数据信息安全日志审计分析方法 1.海量数据采集.大数据采集过程的主要特点和挑战是并发数高,因此采集数据量较大时,分析平台的接收性能也将面临较大挑战.大数据审计平台可采用大数据收集技术对各种类型的数据 ...

  7. Hive大数据项目实践

    在搭建了Hadoop和hive环境后,就可以使用hive来进行数据库相关操作了.Hive提供了hql(类sql)语句来操作,基本过程与mysql类似,区别的就是对于hive中的聚合操作,将使用hado ...

  8. Splunk—云计算大数据时代的超级日志分析和监控利器

          信息科技的不断进步,一方面使得银行业信息和数据逻辑集中程度不断得到提高,另一方面又成为银行业稳健运行的一大安全隐患.Splunk作为智能的IT管理运维平台,能够帮助银行业积极迎接.应对和解 ...

  9. 大数据项目之电商分析平台(2)

    第三章  .程序框架解析 3.1.模块分析 3.1.1.commons模块 1. conf 包 代码清单 3-1 ConfigurationManager类 /** * 配置工具类 */ object ...

最新文章

  1. 深度学习需要掌握的 13 个概率分布(附代码)
  2. 金山网盾监测:游戏玩家下载西西游戏外挂会中大量***
  3. 无线充电系统的功率与效率
  4. 2020-11-18(失败的一天)
  5. 攻防世界-Misc-stegano(巨详细.零基础)
  6. Ubuntu 安装Jdk(apt-get)
  7. java 成员变量的初始化_Java类变量和成员变量初始化过程
  8. oracle 创建新库时报错:enterprise manager 配置失败
  9. 计算机教学理论研究,计算机辅助教学理论实践研究.doc
  10. POJ 2433 枚举
  11. mac python安装第三方库jupyter_Mac搭建jupyter环境
  12. [Git] warning: Clone succeeded, but checkout failed.
  13. android监听键盘的隐藏,Android监听软键盘的显示和隐藏
  14. HTML期末作业,仿b站视频项目模板(HTML+CSS+JS)
  15. 技术向Technical Artist(TA)如何学习——分享个人经历
  16. html英文读法,classin读音发音 classln英文怎么读?
  17. MeGUI 压片之新手上路
  18. 制作条形码的手机App推荐
  19. 【NOIP2018提高组D2T2】填数游戏
  20. UE学习笔记(一)UC++基础类

热门文章

  1. 一文带你深刻的进入python,并且了解python的优缺点
  2. 数字图像处理课程作业二-车牌识别
  3. esp寄存器 linux,Linux的中断和系统调用 esp、eip等寄存器
  4. Mac笔记本电脑电池完全充电
  5. 真的来了!百度文心一言APP在哪里下载?跟网页版有什么区别?
  6. 酒吧里常见的24种PartyOK游戏
  7. MySQL专题(学会就毕业)
  8. iPhone安装企业包出现“无法验证证书”问题解决
  9. 【自动化测试】Web自动化测试02
  10. 如何通过python实现一个web自动化测试框架?