启动服务组件
zk-hadoop-kafka-flume-hbase-sparkstreaming(eclipse操作)

前端服务启动(eclipse操作–用前端js代码监听页面上的操作信息)

控制台测试效果

-------------------------------------------
Time: 1664693880000 ms
-------------------------------------------
http://localhost:8080/FluxAppServer/b.jsp|b.jsp|页面B|UTF-8|1920x1080|24-bit|en|0|1||0.05643445047847484|http://localhost:8080/FluxAppServer/a.jsp|Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36|59844165863196852806|9581557787_2_1664693872485|0:0:0:0:0:0:0:1
http://localhost:8080/FluxAppServer/b.jsp|b.jsp|页面B|UTF-8|1920x1080|24-bit|en|0|1||0.6594139001727657|http://localhost:8080/FluxAppServer/a.jsp|Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36|59844165863196852806|9581557787_3_1664693872657|0:0:0:0:0:0:0:1

hbase建表

hbase(main):002:0> create 'fluxtab','cf1'

运行sparkstreaming
效果

hbase(main):007:0> scan 'fluxtab'
ROW                    COLUMN+CELL                                                    1664697631822_5984416 column=cf1:cip, timestamp=1664691777265, value=0:0:0:0:0:0:0:1 5863196852806_6392526                                                                153_0:0:0:0:0:0:0:1_7                                                                7

登录mysql
建立一个新表

mysql> use weblog;
mysql> create table tongji2(reporttime date,pv int,uv int,vv int,newip int,newcust int);

代码
bean类

package cn.tedu.kafka.streaming/*** 样例类,必须声明一个主构造器,* 默认构造一个空构造器默认混入序列化特质* 默认实现toString*/case class LogBean( val url:String,val urlname:String,val uvid:String,val ssid:String,val sscount:String,val sstime:String,val cip:String) {}
package cn.tedu.kafka.streamingcase class MysqlBean(time:Long,pv:Int,uv:Int,vv:Int,newip:Int,newcust:Int) {}

MySQL数据库工具类

package cn.tedu.kafka.streamingimport com.mchange.v2.c3p0.ComboPooledDataSource
import java.sql.Connection
import java.sql.PreparedStatement
import java.text.SimpleDateFormat
import java.sql.ResultSet
import java.sql.Dateobject MysqlUtil {//-获取c3p0的连接池对象val dataSource=new ComboPooledDataSourcedef saveToMysql(mysqlBean: MysqlBean) = {var conn:Connection=nullvar ps1:PreparedStatement=nullvar rs1:ResultSet=nullvar ps2:PreparedStatement=nullvar ps3:PreparedStatement=nulltry {val sdf=new SimpleDateFormat("YYYY-MM-dd")val nowTime=sdf.format(mysqlBean.time)//获取数据连接conn=dataSource.getConnection()//查询当天的数据ps1=conn.prepareStatement("select * from tongji2 where reporttime=?")ps1.setString(1, nowTime)//执行查询rs1=ps1.executeQuery()if(rs1.next()){//表示当天已经有数据,则更新各个指标,即累加ps3=conn.prepareStatement("update tongji2 set pv=pv+?,uv=uv+?,vv=vv+?,newip=newip+?,newcust=newcust+? where reporttime=?")ps3.setInt(1,mysqlBean.pv)ps3.setInt(2,mysqlBean.uv)ps3.setInt(3,mysqlBean.vv)ps3.setInt(4,mysqlBean.newip)ps3.setInt(5,mysqlBean.newcust)ps3.setString(6,nowTime)ps3.executeUpdate()}else{//表示当天没有数据,执行插入命令ps2=conn.prepareStatement("insert into tongji2 values(?,?,?,?,?,?)")ps2.setString(1, nowTime)ps2.setInt(2, mysqlBean.pv)ps2.setInt(3, mysqlBean.uv)ps2.setInt(4, mysqlBean.vv)ps2.setInt(5, mysqlBean.newip)ps2.setInt(6, mysqlBean.newcust)//执行插入ps2.executeUpdate()}} catch {case t: Throwable => t.printStackTrace() // TODO: handle error}finally {if(ps3!=null) ps3.close()if(ps2!=null) ps2.close()if(ps1!=null) ps1.close()if(rs1!=null) rs1.close()if(conn!=null) conn.close()}}
}

hbase工具类

package cn.tedu.kafka.streamingimport org.apache.spark.SparkContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.fs.shell.find.Result
import scala.util.Random
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.client.Resultobject HBaseUtil {def saveToHBase(sc: SparkContext, bean: LogBean) = {sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"fluxtab")val job=new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[org.apache.hadoop.fs.shell.find.Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//RDD[(key,value)]val hbaseRDD=sc.makeRDD(List(bean)).map { logbean =>//创建HBase的行对象,并指定行键//本项目的行键:sstime_uvid _ssid_cip_随机数字//这样设计的目的://1.行键中包含时间戳信息,所以可以按时间维度做范围查询//2.行键中包含用户id,会话id以及ip信息,所以可以通过行键正则匹配,找到相关数据//3.行键后拼随机数字,满足散列原则,避免热点数据集中到一个HRegion中val rowKey=logbean.sstime+"_"+logbean.uvid+"_"+logbean.ssid+"_"+logbean.cip+"_"+new Random().nextInt(100)val put=new Put(rowKey.getBytes)put.add("cf1".getBytes, "url".getBytes, logbean.url.getBytes)put.add("cf1".getBytes, "urlname".getBytes, logbean.urlname.getBytes)put.add("cf1".getBytes, "uvid".getBytes, logbean.uvid.getBytes)put.add("cf1".getBytes, "ssid".getBytes, logbean.ssid.getBytes)put.add("cf1".getBytes, "sscount".getBytes, logbean.sscount.getBytes)put.add("cf1".getBytes, "sstime".getBytes, logbean.sstime.getBytes)put.add("cf1".getBytes, "cip".getBytes, logbean.cip.getBytes)(new ImmutableBytesWritable,put)}//执行插入hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)}def queryByRowRegex(sc: SparkContext, startTime: Long, endTime: Long, Regex: String) = {val hbaseConf=HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")//hbaseConf.set("hbase.zookeeper.property.clientPort","2181")//指定读取的HBase表名hbaseConf.set(TableInputFormat.INPUT_TABLE,"fluxtab")//创建HBase表扫描对象val scan=new Scan()//设置扫描范围scan.setStartRow(startTime.toString().getBytes)scan.setStopRow(endTime.toString().getBytes)//创建HBase的行键正则过滤器//①参:比较原则,有等于大于大于等于小于小于等于不等于//②参∶正则比较规则对象val filter=new RowFilter(CompareOp.EQUAL,new RegexStringComparator(Regex))//绑定过滤器到scan对象,这样一来,在扫描HBase表数据时过滤器会生效scan.setFilter(filter)//设置Scan对象hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))//执行查询,并将结果集封装到RDD中val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])//返回结果集resultRDD}
}

整体架构—driver入口

package cn.tedu.kafka.streamingimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import java.util.Calendarobject Driver {def main(args: Array[String]): Unit = {//如果是本地模式,消费Kafka数据,启动的线程数至少是2个//其中一个线程负责SparkStreaming,另外一个线程负责从Kafka消费//如果只启动一个线程,则无法从kafka消费数据val conf=new SparkConf().setMaster("local[5]").setAppName("kafkastreaming")val sc=new SparkContext(conf)val ssc=new StreamingContext(sc,Seconds(5))//-指定zookeeper集群地址val zkHosts="hadoop01:2181,hadoop02:2181,hadoop03:2181"val group="gp1"//key是主题名, value是消费的线程数,可以指定多对kv对(即消费多个主题)val topics=Map("enbook"->1,"weblog"->1)//通过工具类,从kafka消费数据val kafkaStream=KafkaUtils.createStream(ssc, zkHosts, group, topics).map{x=>x._2}.foreachRDD{rdd=>//获取一个batch内的RDD数据,并转换为本地迭代器类型//迭代器里封装了一个batch内的多条数据val lines=rdd.toLocalIterator while(lines.hasNext){//每迭代一次,获取一行数据val line=lines.next()val info=line.split("\\|")//提取业务字段val url=info(0)val urlname=info(1)val uvid=info(13)val ssid=info(14).split("_")(0)val sscount=info(14).split("_")(1)val sstime=info(14).split("_")(2)val cip=info(15)val bean=LogBean(url,urlname,uvid,ssid,sscount,sstime,cip)println(bean)//-实现业务指标的查询处理//pv, uv, vv, newip, newcust//①pv:用户访问一次,则即pv=1val pv=1--②uv:独立用户数,统计当天内,//如果当前访问记录中uvid没有出现过,则即uv=1//如果uvid在当天记录已存在,则uv=//当天的范围如何定义:startTime=当天0:00的时间戳//endTime=当前记录中的sstime//把范围定义之后,就可以去HBase做范围查询val endTime=sstime.toLongval calender=Calendar.getInstance//下面的代码表示以endTime为基准.找当天的0:00calender.setTimeInMillis(endTime)calender.set(Calendar.HOUR,0)calender.set(Calendar.MINUTE,0)calender.set(Calendar.SECOND,0)calender.set(Calendar.MILLISECOND,0)//获取当天的0:0日的时间戳val startTime=calender.getTimeInMillis//查询范围定好之后,通过行键正则过滤器来匹配uvid的数据val uvidRegex="^\\d+_"+uvid+".*$"//执行HBase表查询,通过行键正则过滤器匹配val uvResult=HBaseUtil.queryByRowRegex(sc,startTime,endTime,uvidRegex)//如果uvResult.count()==0,表际此uvid在今天的记录没出现过val uv=if(uvResult.count()==0)1 else 0//3VV:独立会话数,如果在当天的范围内是新会话,则vv=1,反之vv=eval ssidRegex="^\\d+_\\d+_"+ssid+".*$"val vvResult=HBaseUtil.queryByRowRegex(sc,startTime,endTime,ssidRegex)val vv=if(vvResult.count()==0)1 else 0//newip:新增ip数,如果当前记录中的ip在历史数据中没有出现过,//才是新增ip,则newip=1,反之,为0//历史数据的范围:startTime=0, endTime=sstimeval newipRegex="^\\d+_\\d+_\\d+_"+cip+".*$"val newipResult=HBaseUtil.queryByRowRegex(sc,0,endTime,newipRegex)val newip=if(newipResult.count()==0)1 else 0//-⑤newcust:新增用户数,判断当前的uvid在历史数据中,如果没出现过,则newcust=1val newcustResult=HBaseUtil.queryByRowRegex(sc,0,endTime,uvidRegex)val newcust=if(newcustResult.count()==0)1 else 0
//                          println("pv:"+pv+"uv:"+uv+"vv:"+vv+"newip:"+newip+"newcust:"+newcust)val mysqlBean=MysqlBean(sstime.toLong,pv,uv,vv,newip,newcust)//将统计好的业务指标插入mysql数据库MysqlUtil.saveToMysql(mysqlBean)//将数据插入hbase---//ctrl+1 快捷生成方法HBaseUtil.saveToHBase(sc,bean)}}//      kafkaStream.print()ssc.start()ssc.awaitTermination()}
}

c3p0配置

<?xml version="1.0" encoding="UTF-8"?>
<c3p0-config><default-config><property name="driverClass">com.mysql.jdbc.Driver</property><property name="jdbcUrl">jdbc:mysql://hadoop01:3306/weblog</property><property name="user">root</property><property name="password">root</property></default-config>
</c3p0-config>

=====================

hbase小demo(写入)

package cn.tedu.spark.hbaseimport org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.client.Putobject WriteDriver {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("writeHBase")val sc=new SparkContext(conf)//设置zookeeper集群地址sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")//-设置zookeeper端口号sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")//设置写出的HBase表名sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"tb1")val job=new Job(sc.hadoopConfiguration)//指定输出key类型job.setOutputKeyClass(classOf[ImmutableBytesWritable])//指定输出value类型job.setOutputValueClass(classOf[Result])//指定输出的表类型job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//1.准备RDD[(key , value)]//2.执行插入HBaseval r1=sc.makeRDD(List("1 tom 18","2 rose 25","3 jim 20"))val hbaseRDD=r1.map { line =>val info=line.split(" ")val id=info(0)val name=info(1)val age=info(2)//创建一个HBase 行对象,并指定行键val put=new Put(id.getBytes)//①参:列族名②参:列名③参:列值put.add("cf1".getBytes, "name".getBytes, name.getBytes)put.add("cf1".getBytes, "age".getBytes, age.getBytes)(new ImmutableBytesWritable,put)}//执行插入hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)}
}

hbase读取小demo

package cn.tedu.spark.hbaseimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Resultobject ReadDriver01 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("readHBase")val sc=new SparkContext(conf)//创建HBase环境参数对象val hbaseConf=HBaseConfiguration.create()//hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")//hbaseConf.set("hbase.zookeeper.property.clientPort","2181")//指定读取的HBase表名hbaseConf.set(TableInputFormat.INPUT_TABLE,"tb1")//①参:HBase环境参数对象②参∶读取表类型③参:输入key类型4参∶输入value//sc.newAPIHadoopRDD执行读取,并将结果返回到RDD中val result=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])result.foreach{x=>//获取每行数据的对象val row=x._2//①参:列族名②参:列名val name=row.getValue("cf1".getBytes, "name".getBytes)val age=row.getValue("cf1".getBytes, "age".getBytes)//println(new String(name)+":"+new String(age))}//}
}
package cn.tedu.spark.hbaseimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.filter.PrefixFilterobject ReadDriver02 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("readHBase")val sc=new SparkContext(conf)//创建HBase环境参数对象val hbaseConf=HBaseConfiguration.create()//hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")//hbaseConf.set("hbase.zookeeper.property.clientPort","2181")//指定读取的HBase表名hbaseConf.set(TableInputFormat.INPUT_TABLE,"student")//创建HBase扫描对象val scan=new Scan()//设定扫描范围
//      scan.setStartRow("s99988".getBytes)
//      scan.setStopRow("s99989".getBytes)//val filter=new PrefixFilter("s9997".getBytes)//设置filter生效scan.setFilter(filter)//创建HBase前缀过滤器,下面表示匹配所有行键以s9997开头的行数据hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))//①参:HBase环境参数对象②参∶读取表类型③参:输入key类型4参∶输入value//sc.newAPIHadoopRDD执行读取,并将结果返回到RDD中val result=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])result.foreach{x=>val row=x._2val id=row.getValue("basic".getBytes, "id".getBytes)println(new String(id))}}
}

=========================

======================
推荐系统----实时
代码
本地数据存储到hdfs

package cn.tedu.kafka.streamingrecimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS/*** 读取样本集,建立推荐系统模型* */object Driver {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("rec")val sc=new SparkContext(conf)val data=sc.textFile("d://data/ml/u.data", 4)//RDD[String]->RDD[Rating(userId,itemId,score)]val ratings=data.map { line =>val info=line.split(" ")val userid=info(0).toIntval movieId=info(1).toIntval score=info(2).toDoubleRating(userid,movieId,score)}//建立推荐系统模型val model=ALS.train(ratings, 50, 10,0.01)//模型存储model.save(sc, "hdfs://hadoop01:9000/rec-result")}
}

spark数据处理,做出推荐

package cn.tedu.kafka.streamingrecimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.streaming.kafka.KafkaUtils
/*** 加载推荐系统模型,并接受Kafka发来的用户数据完成实时推荐* */
object LoadDriver {def cosArray(a1:Array[Double],a2:Array[Double])={val a1a2=a1 zip a2val a1a2Fenzi=a1a2.map{x=>x._1*x._2}.sumval a1Fenmu=Math.sqrt(a1.map { x => x*x }.sum)val a2Fenmu=Math.sqrt(a2.map { x => x*x }.sum)a1a2Fenzi/(a1Fenmu*a2Fenmu)}def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("load")
//            val conf=new SparkConf().setMaster("local[5]").setAppName("load")val sc=new SparkContext(conf)val ssc=new StreamingContext(sc,Seconds(5))//加载模型val model=MatrixFactorizationModel.load(sc, "hdfs://hadoop01:9000/rec-result")//获取物品因子矩阵val movieFactors=model.productFeaturesval zkHosts="hadoop01:2181,hadoop02:2181,hadoop03:2181"//指定消费者组名val groupId="rec01"//指定消费的主题名和线程数val topics=Map("rec"->1)val kafkaStream=KafkaUtils.createStream(ssc, zkHosts, groupId, topics).map{x=>x._2}.filter { liine => liine.split(",").length==2 }.foreachRDD{rdd=>val lines=rdd.toLocalIteratorwhile(lines.hasNext){val line=lines.next()val info=line.split(",")val userId=info(0).toIntval movieId=info(1).toInt//获取浏览的电影id的因子矩阵val movieFactor=movieFactors.keyBy{x=>x._1}.lookup(movieId).head._2//计算其他电影和当前电影的夹角余弦,根据相似度大小降序排序取前六个去掉第一个val cosResults=movieFactors.map{case(id,factor)=>val cos=cosArray(movieFactor, factor)(id,cos)}val r1=cosResults.sortBy{x=> -x._2}.take(6).drop(1)val r2=model.recommendProducts(userId, 5)val result=r1.union(r2)//基于用户id,实现基于用户的推荐,推荐10部电影//课后作业,要求根据浏览的商品id,完成商品推荐//一共推荐10个商品,其中5个来自于基于用户推荐,另外5个来自于商品推荐
//                      val result=model.recommendProducts(userId, 10)result.foreach{println}}}ssc.start()ssc.awaitTermination()}
}

启动Kafka

[root@hadoop01 bin]# sh kafka-server-start.sh ../config/server.properties

创建Kafka主题

[root@hadoop01 bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic rec

启动Kafka生产者

[root@hadoop01 bin]# sh kafka-console-producer.sh --broker-list hadoop01:9092 --topic rec

生产者输入(用户id,电影id)

[root@hadoop01 bin]# sh kafka-console-producer.sh --broker-list hadoop01:9092 --topic rec
>6,6

控制台打印效果

Rating(6,22,9.586876239876736)
Rating(6,14,9.132360061338625)
Rating(6,43,8.836464366788924)
Rating(6,34,8.805655225370668)
Rating(6,81,8.75415942693994)
Rating(6,20,8.691513346899267)
Rating(6,65,8.233194907574342)
Rating(6,46,8.142833395866305)
Rating(6,79,8.038885834759451)
Rating(6,98,7.931351495727018)

有本地spark模式改为使用Linux-spark集群

 val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("load")

spark启动(集群)

[root@hadoop01 sbin]# pwd
/home/presoftware/spark-2.0.1-bin-hadoop2.7/sbin/
[root@hadoop01 sbin]# sh start-all.sh

导出jar包(需要放到Linux–spark服务bin目录下)

上传Kafka jar包到Linux 上spark服务 jars目录

上传自己写的代码jar包到spark bin目录

运行spark(指定自己的jar包)

[root@hadoop01 bin]# sh spark-submit --class  cn.tedu.kafka.streamingrec.LoadDriver rec.jar

Kafka生产者输入(6,6)

[root@hadoop01 bin]# sh kafka-console-producer.sh --broker-list hadoop01:9092 --topic rec
>5,6

spark控制台打印

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Rating(5,7,9.659405295873986)
Rating(5,6,9.448273620776032)
Rating(5,45,9.27702720870958)
Rating(5,9,9.150686590327457)
Rating(5,59,8.475865346386467)
Rating(5,25,8.455949950823445)
Rating(5,63,8.37547781830256)
Rating(5,51,8.303526193265274)
Rating(5,3,8.282373808159182)
Rating(5,4,8.24443957096153)

jar包记录
1.Kafka源码包粘贴(libs目录删除.asc非jar包文件所得)
2.hbase源码包粘贴(lib目录)
3.spark源码包(jars目录)

项目(day03网站流量指标统计)相关推荐

  1. 项目(day01网站流量指标统计)

    网站流量指标统计 对于网站流量指标统计,一般可以分为如下维度: 1.统计每一天的页面访问量. 2.统计每一天的独立访客数(按人头数统计) 3.统计每一天的独立会话数(Session) 收集到如上指标之 ...

  2. 项目(day02网站流量指标统计)

    Hive的占位符与文件的调用 [root@hadoop01 ~]# cd /home/presoftware/apache-hive-1.2.0-bin/bin [root@hadoop01 bin] ...

  3. 大数据千亿级离线数仓项目第五天 指标统计/导出和工作流调度

    千亿级数仓第05天讲义 课程目标 掌握常见指标PV.UV的开发统计 掌握流量常见指标分类 掌握指标数据导出 理解工作流调度流程 模块开发–数据入库 创建ODS层数据表 原始日志数据表 drop tab ...

  4. java http reffer_HTTP Referer与网站流量来路统计

    先抄别人一段话,省的我解释了: 在网站的统计数据中有对网站流量来路的统计,这些数据到底是怎么来的呢?这就要从浏览器如何访问一个网址说起了,简单点来说,当我们通过输入网址或者其他途径(点击网页中链接.地 ...

  5. 从网站流量指标开始,CSDN 如何洞察运营效果异动?丨评测来了

    最近,CSDN组织了一次<人人都能简单上手的指标平台>开箱测评活动,邀请了三位嘉宾,分别是 Kyligence 联合创始人兼 CTO 李扬.CSDN 战略合作总监闫辉.CSDN 开发云 C ...

  6. Piwik网站流量访问统计系统

    Piwik是一套基于PHP5+MySQL技术构建的开源网站访问统计系统,前身是phpMyVisites.Piwik可以给你详细的统计信息,比如网页浏览人数,访问最多的页面,搜索引擎关键词等等流量分析功 ...

  7. 大数据案例--网站流量项目(上)

    目录 一.网站流量统计项目概述 1.项目背景 2.统计指标说明 二.数据的埋点和采集 1.概述 三.项目整体架构 1.架构图 四.项目环境搭建 1.准备 2.搭建 3.字段说明 五.日志服务器-Flu ...

  8. 网站流量可视化分析--页面指标分析、访问量分析

    页面指标分析.访问量分析 Tableau文件 一.实验目的 1.掌握不同大数据可视化图示的应用范围与场景 2.掌握大数据可视化工具Tableau在具体可视化场景中的使用 二.实验内容 1.网站流量的页 ...

  9. 网站流量统计与网站访问分析

    关于网站流量统计与网站访问分析的概念辨析 网站流量统计的基本含义: 网站流量统计,是指对网站访问的相关指标进行统计,常用的网站流量统计指标包括三类:(1)网站流量指标,如在一定统计周期那网站的 ...

最新文章

  1. 机器学习特征筛选:互信息法(mutual information)
  2. c语言程序设计样板,《C语言程序设计》课程设计报告书样板.doc
  3. 数字电路时钟问题——Jitter与Skew区别
  4. linux操作命令等积累
  5. Java顺序IO性能
  6. android 广播唤醒应用,Android通过广播实现灭屏和唤醒
  7. 思科6509的详细配置---加注释
  8. smarty缓存控制
  9. ssh连不上虚拟机上linux,ssh工具连接不上 vmware linux虚拟机的处理
  10. 阿里云短信接口配置教程
  11. android金山清理扫描文件动画,[Android开发实战]金山清理大师(猎豹清理大师)一键加速快捷方式动画实现...
  12. 计算机未来设计建筑,未来设计的趋势解析,参数化设计及创意设计案例欣赏
  13. Kubernetes Pod Eviction 简介
  14. Mac电脑的效率超高的输入法,自动切换输入法
  15. 软通动力入职考试----全套
  16. 常常激励我们的36句话
  17. 图数据库查询语言Cypher、Gremlin和SPARQL
  18. NFC技术 (一) -基础介绍
  19. 计算机报名为什么说我没有在系统用户中注册,硕士研究生网上报名常见问题汇总...
  20. 生物网络中基于节点相似度的链路预测图卷积

热门文章

  1. JS中数组删除指定元素
  2. SYSAUX tablespace grows quite fast due to Apply spilling [ID 556183.1]
  3. jquery的DOM节点操作(删除元素节点)
  4. P82-前端基础动画效果-动画旋转练习鸭子表
  5. 如何才能有效缓解焦虑?看看猿辅导怎么说
  6. 用Python制作音乐播放器(上)
  7. bzoj2716 [Violet 3]天使玩偶(CDQ分治)
  8. 两台电脑上的虚拟机相互通信
  9. 渗透杂记-2013-07-13
  10. Material Design风格神框架vuetify 学习笔记(七) 基础组件3 抽屉 卡片