大数据项目实践--手机日志分析
一、准备环境
1、下载jdk-8u45-windows-x64.exe 安装于D:\Java8
2、修改JAVA_HOME为
D:\Java8\jdk1.8.0_45
3、修改HADOOP_HOME为
D:\hadoop272
4、搭建Hadoop102 Hadoop103 Hadoop104 三台centos6.5虚拟机
需修改hosts文件
192.168.198.134 Hadoop102
192.168.198.135 Hadoop103
192.168.198.136 Hadoop104
/etc/sysconfig/networks文件
NETWORKING=yes
HOSTNAME=Hadoop102
/etc/sysconfig/network-scripts/ifcfg-eth0文件
DEVICE="eth0"
BOOTPROTO="static"
NM_CONTROLLED="yes"
ONBOOT="yes"
TYPE="Ethernet"
UUID="2dc126cb-ef2a-412e-a373-45fbe1829354"
IPADDR=192.168.198.134
GATEWAY=192.168.198.2
NETMASK=255.255.255.0
DNS1=192.168.198.2
DNS2=114.114.114.114
DNS3=8.8.8.8
5、配置三台主机的登录秘钥
6、上传jdk1.8.0_45 至 /usr/java
7、上传hadoop272至 /opt/module2
8、配置hadoop 的6个文件
9、上传hive flume kafka zookeeper 至/opt/modules/ap 并解压
10、配置环境变量/etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_45
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/libexport HADOOP_HOME=/opt/modules/ap/hadoop272
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOOME/sbin:$HADOOP_HOME/lib:$HADOOP_HOME/lib/native
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export HADOOP_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop
export HDFS_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop
export YARN_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoopexport KAFKA_HOME=/opt/modules/ap/kafka_2.11-0.11.0.0
export PATH=$PATH:$KAFKA_HOME/bin:$KAFKA_HOME/libsexport ZOOKEEPER_HOME=/opt/modules/ap/zookeeper-3.4.10
export PATH=$PATH:$ZOOKEEPER_HOME/binexport HIVE_HOME=/opt/modules/ap/apache-hive-1.2.1-bin
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/libexport FLUME_HOME=/opt/modules/ap/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/bin
11、安装配置zookeeper、hive、kafka、flume详见本人相关文章以下仅写出由于版本问题需要改动或补充的部分
12、此处由于hive版本较高,需要执行
schematool -dbType mysql -initSchema
命令手动初始化mysql数据库
否则会报错
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
13、另外 配置metaserver后 使用hive前先后台启动metaserver
hive --service metastore &
14、win10安装idea
下载idea 2019.1
15、修改c:\Windows\System32\drivers\etc\hosts文件,添加
0.0.0.0 account.jetbrains.com0.0.0.0 www.jetbrains.com
16、下载JetbrainsIdesCrack 4.2 jar
放到idea 2019的安装目录 bin下
修改 idea.exe.vmoptions和 idea64.exe.vmoptions
在末尾添加包的路径
-javaagent:D:\Javasoft\IntelliJ IDEA 2019.1.3\bin\jetbrains-agent.jar
17、打开idea2019,在activatecode 输入下载的注册码
18、打开D:\idea project\app_statistics_project 项目
二、项目概要
统计 手机app 的活跃度
关键词:开发商(租户)、app 、SDK、用户、活跃用户、沉默用户、忠诚用户、流失用户、留存用户、用户新鲜度、启动次数、单次使用时长、日使用时长
功能模块:
1)实现收集手机APP日志
2)定期离线分析业务指标
3)数据展示
流程:
1)手机APP启动时,上报启动日志、错误日志、页面日志、事件日志、使用时长日志等信息到日志收集服务器。
2)日志收集服务器将收集到的日志信息发送给kafka。
3)Flume分别消费kafka中的5种主题信息,并把数据存储到HDFS上。
4)通过crontab任务调度定时把HDFS中的信息拷贝到Hive数据仓库中。
5)核心业务操作采用Hive查询。
6)查询结果通过数据展示平台展示。
三、项目工程结构
项目:app_statistics_project
工程1:app_logs_common 提供实体类和地理信息的获取
–AppBaseLog 日志消息基类
创建时间、应用标识、租户标识、设备标识、应用版本、安装渠道、操作系统、系统版本、机型
–AppErrorLog 错误日志
错误摘要、错误详情
–AppEventLog 事件日志
事件标识、持续时长、参数(键值)
–AppPageLog 页面日志
页面标识、访问顺序、下个页面、停留时长
–AppStartupLog 启动日志
国家、省份、ip地址、网络运营商、品牌、分辨率
–AppUsageLog 时长日志
单次使用时长、单次上传流量、单次下载流量
–AppLogEntity 日志集合
private AppStartupLog[] appStartupLogs;
private AppPageLog[] appPageLogs;
private AppEventLog[] appEventLogs;
private AppUsageLog[] appUsageLogs;
private AppErrorLog[] appErrorLogs;
–GeoInfo 地理信息
country 、province
–GeoUtil 地理信息工具
加载国家数据
public static String getCountry(String ip)
加载省份数据
public static String getProvince(String ip)
工程2、app_logs_client 手机客户端工程,模拟生日生成
–GenerateData 数据生成类
日志创建时间、应用唯一标识、租户唯一标识、设备唯一标识、版本、渠道、操作系统、系统版本、机型
日志信息初始值或随机值定义
private static Long[] createdAtMsS = initCreatedAtMs();//日志创建时间private static String appId = "sdk34734";//应用唯一标识private static String[] tenantIds = {"cake"};//租户唯一标识,企业用户private static String[] deviceIds = initDeviceId();//设备唯一标识private static String[] appVersions = {"3.2.1", "3.2.2"};//版本private static String[] appChannels = {"youmeng1", "youmeng2"};//渠道,安装时就在清单中制定了,appStore等。private static String[] appPlatforms = {"android", "ios"};//平台private static String[] osTypes = {"8.3", "7.1.1"};//操作系统private static String[] deviceStyles = {"iPhone 6", "iPhone 6 Plus", "红米手机1s"};//机型
//初始化设备idprivate static String[] initDeviceId()//初始化创建时间private static Long[] initCreatedAtMs()
//启动日志属性值private static String[] countrys = {"America", "china"};//国家,终端不用上报,服务器自动填充该属性private static String[] provinces = {"Washington", "jiangxi", "beijing"};//省份,终端不用上报,服务器自动填充该属性private static String[] networks = {"WiFi", "CellNetwork"};//网络private static String[] carriers = {"中国移动", "中国电信", "EE"};//运营商private static String[] brands = {"三星", "华为", "Apple", "魅族", "小米", "锤子"};//品牌private static String[] screenSizes = {"1136*640", "960*640", "480*320"};//分辨率
//事件日志属性值private static String[] eventIds = {"popMenu", "autoImport", "BookStore"}; //事件唯一标识private static Long[] eventDurationSecsS = {new Long(25), new Long(67), new Long(45)};//事件持续时长static Map<String, String> map1 = new HashMap<String, String>() {{put("testparam1key", "testparam1value");put("testparam2key", "testparam2value");}};static Map<String, String> map2 = new HashMap<String, String>() {{put("testparam3key", "testparam3value");put("testparam4key", "testparam4value");}};private static Map[] paramKeyValueMapsS = {map1, map2};//参数名/值对
//使用时长日志属性值private static Long[] singleUseDurationSecsS = initSingleUseDurationSecs();//单次使用时长(秒数),指一次启动内应用在前台的持续时长// 单次使用时长private static Long[] initSingleUseDurationSecs()
// 错误日志属性值private static String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //错误摘要private static String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //错误详情
// 页面使用情况日志属性值private static String[] pageIds = {"list.html", "main.html", "test.html"};//页面idprivate static int[] visitIndexs = {0, 1, 2, 3, 4};//访问顺序号,0为第一个页面private static String[] nextPages = {"list.html", "main.html", "test.html", null}; //下一个访问页面,如为空则表示为退出应用的页面private static Long[] stayDurationSecsS = {new Long(45), new Long(2), new Long(78)};//当前页面停留时长
生成模拟数据数组
// 初始化五类log的数据//启动相关信息的数组private static AppStartupLog[] appStartupLogs = initAppStartupLogs();//页面跳转相关信息的数组private static AppPageLog[] appPageLogs = initAppPageLogs();//事件相关信息的数组private static AppEventLog[] appEventLogs = initAppEventLogs();//app使用情况相关信息的数组private static AppUsageLog[] appUsageLogs = initAppUsageLogs();//错误相关信息的数组private static AppErrorLog[] appErrorLogs = initAppErrorLogs();
// 初始化每类log的公共属性值private static void initLogCommon(AppBaseLog baselog){// 日志创建时间baselog.setCreatedAtMs(System.currentTimeMillis());// appidbaselog.setAppId(appId);// 租户唯一标识,企业用户String tenantId = tenantIds[random.nextInt(tenantIds.length)];if (tenantId != null) {baselog.setTenantId(tenantId);}baselog.setTenantId(tenantIds[random.nextInt(tenantIds.length)]);// 设备唯一标识baselog.setDeviceId(deviceIds[random.nextInt(deviceIds.length)]);// 版本baselog.setAppVersion(appVersions[random.nextInt(appVersions.length)]);// 渠道baselog.setAppChannel(appChannels[random.nextInt(appChannels.length)]);// 平台baselog.setAppPlatform(appPlatforms[random.nextInt(appPlatforms.length)]);// 操作系统baselog.setOsType(osTypes[random.nextInt(osTypes.length)]);// 机型baselog.setDeviceStyle(deviceStyles[random.nextInt(deviceStyles.length)]);}
各类日志的init函数
// 启动相关信息的数组private static AppStartupLog[] initAppStartupLogs()// 页面相关信息的数组private static AppPageLog[] initAppPageLogs()// 事件相关信息的数组private static AppEventLog[] initAppEventLogs()// app使用情况相关信息的数组private static AppUsageLog[] initAppUsageLogs() // 错误相关信息的数组private static AppErrorLog[] initAppErrorLogs()
发送数据
// 循环发送数据public static void main(String[] args)
–UploadUtil 日志上传工具类
//上传日志
public static void upload(String json)URL url = new URL("http://hadoop102:8080/app_logs/coll/index");
工程3、app_logs_collect_web 网站端 spring mvc网站,浏览数据
–日志收集类
@Controller
@RequestMapping("/coll")
public class CollectLogController
//接收发送过来的数据public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req)
// 修正时间
private void verifyTime(AppLogEntity e, HttpServletRequest req)
//修正ip client地址
private void processIp(AppLogEntity e, HttpServletRequest req)
// 缓存地址信息
private Map<String, GeoInfo> cache = new HashMap<String, GeoInfo>();
设置kafka主题
public class Constants {//主题public static final String TOPIC_APP_STARTUP = "topic_app_startup" ;public static final String TOPIC_APP_ERRROR = "topic_app_error" ;public static final String TOPIC_APP_EVENT = "topic_app_event" ;public static final String TOPIC_APP_USAGE = "topic_app_usage" ;public static final String TOPIC_APP_PAGE = "topic_app_page" ;
}
// 发送消息给发Kafka
private void sendMessage(AppLogEntity e)
//发送单个的log消息给kafka
private void sendSingleLog(KafkaProducer<String, String> producer, String topic, AppBaseLog[] logs)
工程4、app_logs_flume 创建Flume拦截器,区分kafka传递过来的日志类型
类:public class LogCollInterceptor implements Interceptor
//将flume事件解析成五个类别public Event intercept(Event event)
配置Flume
a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hxzy.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_pagea1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStreama1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
工程5、HIVE数据处理
分区表
tab_name
ext_error_logs
ext_event_logs
ext_page_logs
ext_startup_logs
ext_usage_logs
类:DateUtil
java date 处理函数
//根据输入的时间信息,返回当天的起始时间;
public static Date getDayBeginTime(Date d)
public static Date getDayBeginTime(Date d, int offset)
//根据输入的时间信息,返回本周的起始时间;
public static Date getWeekBeginTime(Date d)
public static Date getWeekBeginTime(Date d, int offset)
//根据输入的时间信息,返回本月的起始时间;
public static Date getMonthBeginTime(Date d)
public static Date getMonthBeginTime(Date d, int offset)
UDF函数
类:
class DayBeginUDF extends UDF
// 计算现在的起始时刻(毫秒数)public long evaluate() throws ParseException {return evaluate(new Date());}
// 指定天偏移量public long evaluate(int offset) throws ParseException {return evaluate(DateUtil.getDayBeginTime(new Date(), offset));}
// 计算某天的起始时刻,日期类型(毫秒数)public long evaluate(Date d) throws ParseException {return DateUtil.getDayBeginTime(d).getTime();}
// 计算某天的起始时刻,日期类型,带偏移量(毫秒数)public long evaluate(Date d, int offset) throws ParseException {return DateUtil.getDayBeginTime(d, offset).getTime();}
// 计算某天的起始时刻,String类型(毫秒数)public long evaluate(String dateStr) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");Date d = sdf.parse(dateStr);return evaluate(d);}
// 计算某天的起始时刻,String类型,带偏移量(毫秒数)public long evaluate(String dateStr, int offset) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d, offset).getTime();}
// 计算某天的起始时刻,String类型,带格式化要求(毫秒数)public long evaluate(String dateStr, String fmt) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat(fmt);Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d).getTime();}
// 计算某天的起始时刻,String类型,带格式化,带偏移量(毫秒数)public long evaluate(String dateStr, String fmt, int offset) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat(fmt);Date d = sdf.parse(dateStr);return DateUtil.getDayBeginTime(d, offset).getTime();}
}
类:public class WeekBeginUDF extends UDF
类:public class MonthBeginUDF extends UDF
类:public class FormatTimeUDF extends UDF
SQL:
新增用户统计:
日新增用户
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;
月新增用户
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getmonthbegin() and mintime < getmonthbegin(1)
)t ;
日活跃用户数
select
count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
一周内,每天的日活跃数
select
formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getweekbegin() and createdatms < getweekbegin(1)
group by formattime(createdatms,'yyyy/MM/dd');
连续3周活跃用户
select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where appid = 'sdk34734'
and concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd')
group by deviceid
沉默用户数
select
count(*)
from
(select deviceid , count(createdatms) dcount,min(createdatms) dmin
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having dcount = 1 and dmin < getdaybegin(-1)
)t;
工程6、web可视化
配置spring mvc beans.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-4.3.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.3.xsd"><!-- 扫描service包 --><context:component-scan base-package="com.hxzy.applogs.visualize.service" /><!-- 连接hive数据源 --><bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"><property name="driverClass" value="org.apache.hive.jdbc.HiveDriver" /><property name="jdbcUrl" value="jdbc:hive2://192.168.1.103:10000/applogsdb" /><property name="user" value="hxzy" /><property name="password" value="" /></bean><bean id="sqlSessionFactoryBean" class="org.mybatis.spring.SqlSessionFactoryBean"><property name="dataSource" ref="dataSource" /><property name="configLocation" value="classpath:mybatis-config.xml" /></bean><bean id="statMapper" class="org.mybatis.spring.mapper.MapperFactoryBean"><property name="mapperInterface" value="com.hxzy.applogs.visualize.dao.StatMapper"></property><property name="sqlSessionFactory" ref="sqlSessionFactoryBean"></property></bean>
</beans>
配置 mybetis
mybatis-config.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configurationPUBLIC "-//mybatis.org//DTD Config 3.0//EN""http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration><mappers><mapper resource="StatMapper.xml" /></mappers>
</configuration>
StatMapper.xml
<mapper namespace="com.hxzy.applogs.visualize.dao.StatMapper"><!-- 查询新增用户 --><select id="findNewUsers" resultMap="rm_StatBean">select count(*) stcount from ext_startup_logs</select><resultMap id="rm_StatBean" type="com.hxzy.applogs.visualize.domain.StatBean"><result column="stcount" property="count" /></resultMap><select id="findThisWeekNewUsers" resultMap="rm_weekUser">select formattime(t.mintime,'yyyy/MM/dd') stdate , count(*) stcountfrom (select deviceid,min(createdatms) mintimefrom ext_startup_logswhere appid = #{appid} group by deviceid having mintime >= getweekbegin() and mintime < getweekbegin(1)) tgroup by formattime(t.mintime,'yyyy/MM/dd')</select><resultMap id="rm_weekUser" type="com.hxzy.applogs.visualize.domain.StatBean"><result column="stcount" property="count" /><result column="stdate" property="date" /></resultMap>
</mapper>
类:StatController
//统计每周每天新增用户数public Map<String, Object> stat3()
类2:统计服务
@Service("statService")
public class StatServiceImpl implements StatService
三、调试client、collect_web、flume拦截器3个工程
1、app_log_collect_web
调试问题1:
未找到包 xxxx
解决:pom.xml 为正确配置 app_logs_commen模块,名称错误
调试问题2:
Error:(31, 46) java: 找不到符号符号: 类 AppLogEntity位置: 类 com.atguigu.applogs.collect.web.controller.CollectLogController
解决:依据文档添加该类
2、app_log_client --> generatedata
调试问题1:
Error:java: Compilation failed: internal java compiler error
解决:将项目和工程的language level 统一改为 7 diamonds
调试问题2:找不到方法setSingleUseDurationSecs
解决:依据文档将缺失的方法添加到项目中
调试问题3:
java.net.ConnectException: Connection timed out: connect
未能连接网络服务器
解决:将目标链接改为本地
URL url = new URL("http://localhost:8080/coll/index");// 测试地址
调试问题4:
接收到的响应码始终未404
调试方法:
在CollectLogController 的 collect action中 添加参数接收的控制台输出
System.out.println("logEntity.setAppStartupLogs");
在GenerateData 的 main 方法中添加控制台输出对方法执行实施监控
System.out.println("logEntity.setAppStartupLogs");
AppStartupLog[] a = new AppStartupLog[]{appStartupLogs[random.nextInt(appStartupLogs.length)]};
logEntity.setAppStartupLogs(a);
System.out.println(a[0].getCountry());
发送json
{"appErrorLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","createdAtMs":1597834893609,"deviceId":"device2265","deviceStyle":"红米手机1s","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)","errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","osType":"8.3","tenantId":"cake"}],"appEventLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device2222","deviceStyle":"红米手机1s","eventDurationSecs":45,"eventId":"autoImport","osType":"8.3","paramKeyValueMap":{"testparam3key":"testparam3value","testparam4key":"testparam4value"},"tenantId":"cake"}],"appPageLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.1","createdAtMs":1597834893608,"deviceId":"device2248","deviceStyle":"红米手机1s","nextPage":"list.html","osType":"8.3","pageId":"main.html","stayDurationSecs":45,"tenantId":"cake","visitIndex":2}],"appStartupLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","brand":"小米","carrier":"EE","country":"America","createdAtMs":1597834893608,"deviceId":"device2274","deviceStyle":"iPhone 6 Plus","network":"WiFi","osType":"7.1.1","province":"Washington","screenSize":"480*320","tenantId":"cake"}],"appUsageLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device224","deviceStyle":"iPhone 6 Plus","osType":"7.1.1","singleUseDurationSecs":93,"tenantId":"cake"}]}
System.out.println(json);
解决:数据生成正常,action端接收不到
1、将url改为:
http://localhost:8080/app_logs_collect_web_war_exploded/coll/index
通过控制台输出发现,程序在向kafka发送数据时,发生异常,由于虚拟机服务器未启动,因此情况正常,可以暂时注释掉向kafka发送数据的步骤
//sendMessage(e);
app_logs_client 工程 调试完成。
3、配置kafka集群的server.properties
delete.topic.enable=true
broker.id=0
log.dirs=/opt/modules/ap/kafka_2.11-0.11.0.0/logs
zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181
num.recovery.threads.per.data.dir=1
num.partitions=1
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
num.io.threads=8
num.network.threads=3
在三台服务器依次启动kafka服务器
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /opt/modules/ap/kafka_2.11-0.11.0.0/config/server.properties &
创建主题
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_startup
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_error
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_event
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_usage
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_page
创建控制台 topic_app_startup主题 消费者
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper Hadoop102:2181 --topic topic_app_startup
问题1:启动新的consumer后,发生异常
问题解决:未配置consumer.properties
zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181
问题2:controller消息发送失败
解决:未配置producer.properties
bootstrap.servers=Hadoop102:9092,Hadoop103:9092,Hadoop104:9092
问题3:使用kafka-server-stop.sh 脚本关闭kafka失败
解决:编辑此脚本,指定kafka版本,“kafka.kafka” =>“kafka_2.11-0.11.0.0”
修改三个问题后,consumer接收app_logs_start消息正常
4、将调试后的app_logs_collect_web 打成war包 部署到 hadoop102的tomcat
检查是否pom文件中包含war属性
—打包之前首先clean项目
问题1:app_logs_common.jar找不到
解决:首先打包app_logs_common jar包,并且执行install操作
问题2:could not copy GeoLite2-City.mmdb 。。。请求的操作无法在使用用户映射区域打开的文件上执行
解决:打包前需要停止正在运行的项目,关闭后打包成功
将war包复制到Hadoop102的/usr/tomcat/webapps目录下
开启tomcat /usr/tomcat/bin/
修改app_logs_client工程中的UploadUtil中的请求地址为
http://hadoop102:8080/app_logs_collect_web/coll/index
5、flume接收kafka5个主题转存hdfs,并使用拦截器区分存储路径
ps: 此处也可使不使用拦截器而开动5个flume-agent分别接收5个不同主题但为节省维护成本,可以创建flume拦截器LogCollInterceptor类,用于在Event header中增加logtype键值,主要实现intercept函数如下:
public class LogCollInterceptor implements Interceptor {.../*** Modifies events in-place.*/public Event intercept(Event event) {// 1获取flume接收消息头Map<String, String> headers = event.getHeaders();// 2获取flume接收的json数据数组byte[] json = event.getBody();// 将json数组转换为字符串String jsonStr = new String(json);// pageLogString logType = "" ;if(jsonStr.contains("pageId")){logType = "page" ;}// eventLogelse if (jsonStr.contains("eventId")) {logType = "event";}// usageLogelse if (jsonStr.contains("singleUseDurationSecs")) {logType = "usage";}// errorelse if (jsonStr.contains("errorBrief")) {logType = "error";}// startupelse if (jsonStr.contains("network")) {logType = "startup";}// 3将日志类型存储到flume头中headers.put("logType", logType);// 简易调试: 在窗口输出json和logTypeSystem.out.println("json:"+json.toString());System.out.println("logType:"+logType);return event;}}
打包app_logs_flume工程为app_logs_flume_1.0.jar,拷贝到
/opt/modules/ap/apache-flume-1.7.0-bin/lib文件夹下面作为拦截器class
配置 flume-applog-conf.properties 配置kafka数据源、拦截器
a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_pagea1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/usr/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStreama1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
启动flume agent脚本
/opt/modules/ap/apache-flume-1.8.0-bin/bin/flume-ng agent -f /opt/modules/ap/apache-flume-1.8.0-bin/conf/flume-applog-conf.properties -n a1
四、调试hive、visulize两个项目
1、修改hive配置,支持json,压缩模式改为不压缩
(1)将json-serde-1.3.8-jar-with-dependencies.jar导入到hive的/opt/module/hive/lib
(2)在/opt/module/hive/conf/hive-site.xml文件中添加如下配置
<property><name>hive.aux.jars.path</name><value>/opt/modules/ap/apache-hive-1.2.1-bin/lib/json-serde-1.3.8-jar-with-dependencies.jar</value></property> <property><name>hive.exec.compress.output</name><value>false</value></property>
2、创建applogs_db数据库
drop database applogs_db;
create database applogsdb;
use applogsdb;
3、创建外部分区表
--startup
CREATE external TABLE ext_startup_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,country string,province string,ipAddress string,network string,carrier string,brand string,screenSize string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--error
CREATE external TABLE ext_error_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,errorBrief string,errorDetail string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--event
CREATE external TABLE ext_event_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,eventId string,eventDurationSecs bigint,paramKeyValueMap Map<string,string>)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--page
CREATE external TABLE ext_page_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,pageViewCntInSession int,pageId string,visitIndex int,nextPage string,stayDurationSecs bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;--usage
CREATE external TABLE ext_usage_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,singleUseDurationSecs bigint,singleUploadTraffic bigint,singleDownloadTraffic bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
4、编程hive定时脚本 hdfstohivebyminute.sh ,每隔一分钟将HDFS上的数据,导入到Hive对应分区一次
#!/bin/bash
systime=`date -d "-3 minute" +%Y%m-%d-%H%M`
ym=`echo ${systime} | awk -F '-' '{print $1}'`
day=`echo ${systime} | awk -F '-' '{print $2}'`
hm=`echo ${systime} | awk -F '-' '{print $3}'`
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/startup/${ym}/${day}/${hm}' into table applogsdb.ext_startup_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/error/${ym}/${day}/${hm}' into table applogsdb.ext_error_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/event/${ym}/${day}/${hm}' into table applogsdb.ext_event_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/usage/${ym}/${day}/${hm}' into table applogsdb.ext_usage_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/page/${ym}/${day}/${hm}' into table applogsdb.ext_page_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
将文件格式化为UNIX格式
yum install -y dos2unix
dos2unix /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohivebyminute.sh
编写Linux调度列表 /etc/crontab 添加
* * * * * source /etc/profile; /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohive.sh
chmod -R 777 /opt/modules/ap/apache-hive-1.2.1-bin/source /etc/crontab
service crond start
查看是否有数据进入hive
use applogsdb;
select * from ext_startup_logs;
5、编写udf函数
略
6、导出udf函数 jar包 app_log_hive.jar 到hive/lib
在当前进程中添加jar包
hive>add jar /opt/modules/ap/apache-hive-1.2.1-bin/lib/app_logs_hive.jar;
在 hive-site.xml中添加jar包
<property><name>hive.aux.jars.path</name><value>file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
<property><name>hive.aux.jars.path</name><value>file:///opt/module/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar,file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
添加函数
create function getdaybegin AS 'com.atguigu.hive.DayBeginUDF';
create function getweekbegin AS 'com.atguigu.hive.WeekBeginUDF';
create function getmonthbegin AS 'com.atguigu.hive.MonthBeginUDF';
create function formattime AS 'com.atguigu.hive.FormatTimeUDF';
7、测试今日新增、昨日新增等统计指标sql
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;
。。。。。。。指标sql略
8、启动hiveserver2
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hiveserver2 &
9、启动visulise_web工程
大数据项目实践--手机日志分析相关推荐
- 大数据时代的全能日志分析专家--Splunk安装与实践
大数据时代的全能日志分析专家 --Splunk安装与实践 0.背 景 随着大家对网络安全意识的提高,企业网管理人员,必须对IT基础设置进行监控及安全事件的管理,管理数据的数量和种类非常巨大,那么就需 ...
- 大数据项目实践过程笔记
开发工具intelijidea 2.19.3 目前围绕Hadoop体系的大数据架构包括: 传统大数据架构 数据分析的业务没有发生任何变化,但是因为数据量.性能等问题导致系统无法正常使用,需要进行升级改 ...
- 大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统
一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS ...
- 大数据项目实践:基于hadoop+spark+mongodb+mysql+c#开发医院临床知识库系统
从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存 ...
- 基于大数据电商平台日志分析
一.项目介绍 1.1 项目介绍 本次实训,要求使用Hadoop及其生态圈相关的组件来实现企业级大数据开发的整套流程,即数据的采集.数据的存储.数据的分析处理及数据的可视化.其中数据的采集部分会介绍两种 ...
- 基于大数据审计的信息安全日志分析法
大数据信息安全日志审计分析方法 1.海量数据采集.大数据采集过程的主要特点和挑战是并发数高,因此采集数据量较大时,分析平台的接收性能也将面临较大挑战.大数据审计平台可采用大数据收集技术对各种类型的数据 ...
- Hive大数据项目实践
在搭建了Hadoop和hive环境后,就可以使用hive来进行数据库相关操作了.Hive提供了hql(类sql)语句来操作,基本过程与mysql类似,区别的就是对于hive中的聚合操作,将使用hado ...
- Splunk—云计算大数据时代的超级日志分析和监控利器
信息科技的不断进步,一方面使得银行业信息和数据逻辑集中程度不断得到提高,另一方面又成为银行业稳健运行的一大安全隐患.Splunk作为智能的IT管理运维平台,能够帮助银行业积极迎接.应对和解 ...
- 大数据项目之电商分析平台(2)
第三章 .程序框架解析 3.1.模块分析 3.1.1.commons模块 1. conf 包 代码清单 3-1 ConfigurationManager类 /** * 配置工具类 */ object ...
最新文章
- 深度学习需要掌握的 13 个概率分布(附代码)
- 金山网盾监测:游戏玩家下载西西游戏外挂会中大量***
- 无线充电系统的功率与效率
- 2020-11-18(失败的一天)
- 攻防世界-Misc-stegano(巨详细.零基础)
- Ubuntu 安装Jdk(apt-get)
- java 成员变量的初始化_Java类变量和成员变量初始化过程
- oracle 创建新库时报错:enterprise manager 配置失败
- 计算机教学理论研究,计算机辅助教学理论实践研究.doc
- POJ 2433 枚举
- mac python安装第三方库jupyter_Mac搭建jupyter环境
- [Git] warning: Clone succeeded, but checkout failed.
- android监听键盘的隐藏,Android监听软键盘的显示和隐藏
- HTML期末作业,仿b站视频项目模板(HTML+CSS+JS)
- 技术向Technical Artist(TA)如何学习——分享个人经历
- html英文读法,classin读音发音 classln英文怎么读?
- MeGUI 压片之新手上路
- 制作条形码的手机App推荐
- 【NOIP2018提高组D2T2】填数游戏
- UE学习笔记(一)UC++基础类