ip库的信息在这里下载 http://www.ipip.net/download.html

 182.146.100.97 - 3 [03/Jan/2017:23:30:01 +0800] "GET http://7xna64.com2.z0.glb.qiniucdn.com/Fq9M_Gn0RRWy9eprb0T0CAdrybv3.jpg?imageView2/2/w/1080/h/1920&e=1483592399&token=Q-hCY0VbL4F6NTX3TgRvE_T3vcpNEo2Gr3S9RA-b:HJPKZifauy-LOmjJgA5F1uG9ibs= HTTP/1.1" 200 219736 "-" "Dalvik/2.1.0+(Linux;+U;+Android+6.0.1;+NX549J+Build/MMB29M)"

代码案例

hadoopimport java.security.MessageDigest
import java.text.SimpleDateFormat
import java.util.{Locale, Properties}import IPInfo.IP
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}/*** Created by sicong on 2017/4/19.* 182.146.100.97 - 3 [03/Jan/2017:23:30:01 +0800] "GET http://7xna64.com2.z0.glb.qiniucdn.com/Fq9M_Gn0RRWy9eprb0T0CAdrybv3.jpg?imageView2/2/w/1080/h/1920&e=1483592399&token=Q-hCY0VbL4F6NTX3TgRvE_T3vcpNEo2Gr3S9RA-b:HJPKZifauy-LOmjJgA5F1uG9ibs= HTTP/1.1" 200 219736 "-" "Dalvik/2.1.0+(Linux;+U;+Android+6.0.1;+NX549J+Build/MMB29M)"**/
object paseLogData {val prop = new Properties()prop.put("user", "root")prop.put("password", "")case class Record(user: String, ip: String, country: String, province: String, city: String,restime: Int, time: Long, code: Int, size: Long,firm: String, device: String, rom: String, ke: String)case class Devices(city: String,num:Int,device: String)case class Ipmap(ip: String, provinceCode: Int, cityCode: Int, province: String, city: String)case class CityFlow(city:String,flow:Long)case class StatusCode(code:Int,num:Int)case class ThreadCache(dateParser: SimpleDateFormat, sha1Digester: MessageDigest)val threadSafeCache = new ThreadLocal[ThreadCache]();val Iphelpk = new IP()def getIpInfohga(Str: String): String = {Iphelpk.mains(Str)}
//主入口程序def logbegin(): Unit = {val spark = SparkSession.builder().appName("Spark SQL Example").master("local[4]").config("spark.some.config.option", "some-value").getOrCreate()//    readsp(spark)parseLog("/Users/sicong/Downloads/yitianyike.txt", spark)}def getNeedParseLog(): Array[String] = {// TODO// 1. 当前时间前推 7 小时;2 当前时间前推 12 小时;// 以 1、2 为时间范围,查询日志列表// 日志列表与近期的处理记录比对,若获得的日志为处理,则解析日志,成功后标记为已处理Array("/Users/Simon/Downloads/7xna64.com2.z0.glb.qiniucdn.com_2017-01-03-23_0602")}def logdevicesMysql(kk: Dataset[Devices], s: String):Unit={val prop = new Properties()prop.put("user", "root")prop.put("password", "")kk.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test1?useUnicode=true&characterEncoding=utf8", s,prop)}def CizeFlowStatus(logrdd: RDD[Record],sparkSession: SparkSession) ={import sparkSession.implicits._val dataOfFlow=logrdd.map(x=>(x.city,x.size)).groupByKey().map(x=>(x._1,(x._2.sum.toDouble/(1024)).round))val logMysqldata=dataOfFlow.flatMap(x=>Seq(CityFlow(x._1,x._2))).toDS()logMysqldata.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test1?useUnicode=true&characterEncoding=utf8", "test1.CityFlow",prop)}def HttpStatusCode(logrdd:RDD[Record],sparkSession: SparkSession): Unit ={import sparkSession.implicits._val logMysqldata= logrdd.map(x=>(x.code,x)).groupByKey().flatMap(x=>{Seq(StatusCode(x._1,x._2.size))}).toDS()logMysqldata.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test1?useUnicode=true&characterEncoding=utf8", "test1.StatusCode",prop)}def cityTopURL(logrdd:RDD[Record],sparkSession: SparkSession): Unit ={import sparkSession.implicits._logrdd.map(x=>(x.ke+x.city,x)).groupByKey().map(x=>(x._2.size,x._1)).sortBy(x=>x,false,1).foreach(x=>println(x))}//spark 的解析入口def parseLog(url: String, spark: SparkSession): Unit = {import spark.implicits._val peopleDF = spark.sparkContext.textFile(url)val logrdd = peopleDF.flatMap(line => {val record = parses(line)if (record != null) {Seq(record)} else {Seq()}})//这里对logrdd进行缓存到内存cache 因为接下来的每个算子action如果没有缓存到内存是会每次重新从头开始计算//统计个个省份对应的流量的接口CizeFlowStatus(logrdd,spark)//统计个个状态码的占有率HttpStatusCode(logrdd,spark)cityTopURL(logrdd,spark)}//ip 查询归属地的信息def logprovincecity(str: String): Array[String] = {val Iphelp = new IP();val data = Iphelp.mains(str)data.substring(1, data.length - 1).split(",")}def parses(line: String): Record = {setThreadCache()val as = line.split(" ")val ip = as(0)val restime = as(2).toIntval time = parseVisitTime(as(3))val code = as(8).toIntval size = as(9).toLong//切分出出ua Dalvik/2.1.0+(Linux;+U;+Android+6.0.1;+vivo+Y55A+Build/MMB29M)val ua = line.substring(line.lastIndexOf(" \"") + 2, line.lastIndexOf("\""))val region = logprovincecity(ip)val province = changeEncodeing(region(0))val city = changeEncodeing(region(1))val country = changeEncodeing(region(2))val driver = parseUa(ua)val firm = driver._1val device = driver._2val rom = driver._3val user = mixtureUser(ip, ua)val ke = parseToKey(as(6))val obj = Record(user, ip, country, province, city,restime, time, code, size,firm, device, rom, ke)obj}def changeEncodeing(string: String): String={string}def parseVisitTime(string: String):Long={println(string)var timeData=string.substring(1,string.length)println(timeData)val loc = new Locale("en")val fm = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",loc)val tm = timeDataval dt2 = fm.parse(tm);var dates=dt2.getTime()dates.toString.substring(0,dates.toString.length-3).toLong}def setThreadCache(): Unit = {val cache = threadSafeCache.get()if (cache == null) {val dateParser = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZZZ")val sha1 = MessageDigest.getInstance("SHA1")threadSafeCache.set(ThreadCache(dateParser, sha1))}}private val Iphelp = new IP();def getIpInfo(Str: String): String = {Iphelp.mains(Str)}// AndroidDownloadManager/5.1.1+(Linux;+U;+Android+5.1.1;+OPPO+R9+Plusm+A+Build/LMY47V)// Dalvik/2.1.0+(Linux;+U;+Android+5.1.1;+NX529J+Build/LMY47V)// Dalvik/2.1.0+(Linux;+U;+Android+5.1.1;+NX523J_V1+Build/LMY47V)// Dalvik/2.1.0+(Linux;+U;+Android+6.0.1;+vivo+Y55A+Build/MMB29M)// AndroidDownloadManager/5.1+(Linux;+U;+Android+5.1;+OPPO+R9m+Build/LMY47I)// ua 也包含其它字符// -// Java/1.7.0_09// Go-http-client/1.1// VAYXXLWZIKRFDGFHPOXDNHJTDLTNBTV// ("Android 6.0.1", "vivo Y55A", "Build/MMB29M")def parseUa(ua: String): (String, String, String) = {try {val t1 = ua.split(";").reverseval t2 = t1(0).split("\\+")return (t1(1).replaceAll("\\+", " ").trim, t2.slice(0, t2.length - 1).mkString(" ").trim, t2(t2.length - 1))} catch {case e: Exception => {return ("Error", "Error", "Error")}}}def mixtureUser(ip: String, ua: String) = {hash(ip + ":" + ua)}def hash(s: String): String = {threadSafeCache.get().sha1Digester.digest(s.getBytes).map("%02x".format(_)).mkString}def parseToKey(url: String) = {// https://a 至少有 9 个字符val l = url.indexOf("?", 9);val end = if (l > 0) l else url.length()url.substring(url.indexOf("/", 9) + 1, end)}def getIpInfos(Str: String): Array[String] = {//    val hell = new hello();//    hell.getipdata(Str).split(";")return Array()}def readsp(spark: SparkSession): Unit ={import spark.implicits._val prop = new Properties()prop.put("user", "root")prop.put("password", "")val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "test1.tutorials_tbl",prop)jdbcDF2.foreach(x=>println(x))}def main(args: Array[String]): Unit = {//加载ip库IP.load("/Users/sicong/scalas/17monipdb.dat");logbegin()}}

spark 日志解析格式化相关推荐

  1. Spark HistoryServer日志解析清理异常

    Spark HistoryServer日志解析&清理异常 一.背景介绍 用户在使用 Spark 提交任务时,经常会出现任务完成后在 HistoryServer(Spark 1.6 和 Spar ...

  2. spark日志存储路径为mysql_利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库...

    本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果 ...

  3. ELK之日志收集filebeat,并对nginx,tomcat access日志JSON格式化

    2019独角兽企业重金招聘Python工程师标准>>> 一:ELK日志收集器组件filebeat下载 官方下载地址:https://www.elastic.co/downloads/ ...

  4. GitHub--logparser(日志解析器)

    Apache HTTPD和NGINX访问日志解析器 这是一个Logparsing框架,旨在简化Apache HTTPD和NGINX访问日志文件的解析. 基本思想是,您应该能够拥有一个解析器,可以通过简 ...

  5. Spark日志,及设置日志输出级别

    Spark日志,及设置日志输出级别 1.全局应用设置 2.局部应用设置日志输出级别 3.Spark log4j.properties配置详解与实例(摘录于铭霏的记事本) 文章内容来源: 作者:大葱拌豆 ...

  6. mysql.err日志分析_Mysql日志解析

    转载:https://www.cnblogs.com/Fly-Wind/p/5674382.html 修改Mysql配置 Mysql配置地址为: C:\Program Files (x86)\MySQ ...

  7. 如何使用LocalDateTime解析/格式化日期? (Java 8)

    本文翻译自:How to parse/format dates with LocalDateTime? (Java 8) Java 8 added a new java.time API for wo ...

  8. Spark Shuffle 解析

    5.Spark Shuffle 解析 5.1 Shuffle 的核心要点 5.1.1 ShuffleMapStage 与 FinalStage 在划分 stage 时,最后一个 stage 称为 Fi ...

  9. Cascading(一)之日志解析

    此例子为官网例子,所以直接上代码: 1 package com.wyf.cascade; 2 3 import java.util.Properties; 4 5 import cascading.f ...

最新文章

  1. 对服务器文件夹写,服务器文件夹写入权限设置
  2. ai怎么画循环曲线_AI插画设计,用AI制作一个只可爱的短腿柯基插画
  3. 教小学妹学算法:搜索算法解决迷宫问题
  4. Linux下 安装Redis并配置服务
  5. tf.layers.conv2d_transpose 反卷积
  6. java验证码-汉字验证码
  7. 判断整数_2021暑期强化不定方程整数解问题
  8. 服务器上安装微软雅黑,添加微软雅黑字体到 CentOS 7
  9. 体验+营销+云,Adobe越来越“不务正业”了?
  10. 利用 ls -l 命令可以看到某个文件或目录的权限
  11. 解读常见传感器的CFA排列(彩色滤色矩阵,Color Filter Array)
  12. Bootstrap V5 图标字体的引入以及使用方法
  13. 《蜥蜴脑法则》读后感
  14. 七进制转十进制java_java基础——Java进制转换
  15. Redis主从与集群
  16. 中职计算机组装与维修专业,教育部中等职业计算机示范专业规划教材:计算机组装与维修...
  17. iphone5刷机教程
  18. 记第一次mysql被黑事件
  19. 2022半导体芯片人才市场趋势报告
  20. 腾讯云直播回调处理方式和流程

热门文章

  1. java h5页面嵌入移动端_H5+混合移动app应用开发——坑我太甚
  2. vue div 单选、多选,多选且最多只能选择三个
  3. 正定Hermiltian矩阵分解的两种方法
  4. 基于 Python 监控股票涨停情况
  5. mysql负载均衡解决方案
  6. 现实大于爱情,时间泯灭一切!
  7. LA@常见特殊类型矩阵@伴随矩阵@方阵的性质@余子式
  8. 大数据(044)CDH【CDH介绍】
  9. 天气预报小程序源码,天气类微信小程序源码。API使用的是和风天气。
  10. 《十天学会C++——范磊主讲》读书笔记