投资就是价值观的变现

文章目录

  • 引言
  • SparkGraphX
    • 执行过程
    • 后期优化
  • Neo4j
    • 存储
    • 结构
    • 使用
  • SparkStreaming
    • 秒级实时关系存储
    • 纬度关联+存储
  • Drools

引言

2020年年初业务稳步增长,风控方面遇到了挑战,作为C2C(类似于得物、咸鱼)交易平台,难免会遇到用户团伙进行薅平台羊毛、自买自卖的行为,风控团队将用户的历史数据整合后,希望从中圈选出各个大大小小的团伙,来防止团伙内部和团伙之间进行交易,数据量大,纬度复杂,经过几个周的磨合尝试,以及和某些小额贷款公司的交流,终于确定了一套完整的团伙发现体系,最终走向数据的实时化,其中spark的GraphX和streaming占据重要位置。接下来简述1.0版本的应用。

SparkGraphX

选择使用SparkGraphX有两点:1、数据量大,使用普通python或者java程序实现不了,顺便用spark直接读取hive的数据;
2、使用spark强大的图计算功能,进行连通图的计算等等

执行过程

1、获取数据,从hive中获取数据转化为DF(点-关系)

    val sql =s"""|select uid, fid, type from dw.user_info_xx_d where partition_date=${date}""".stripMarginval matedata: RDD[Row] = hc.sql(sql).rddmatedata.map(elem => {val lineType = elem.getAs[String](2)println(lineType)lineType match{case "充值" => (elem.getAs[Long](1).toString, elem.getAs[String](0), elem.getAs[String](2), "BindAccount")case "支付" => (elem.getAs[Long](1).toString, elem.getAs[String](0), elem.getAs[String](2), "BindAccount")case "认证" => (elem.getAs[Long](1).toString, elem.getAs[String](0), elem.getAs[String](2), "RealnameVerify")...case _ => {logger.error("========问题数据:"+elem+"================")null}}})

2、数据格式化为标准的点和边, (点集合,关系集合)

//数据格式化为点边关系
val node_rels = format_node_rels(uid_tid_rels_type)//数据格式化为GrphX标准点和边
val vertexRDD: RDD[(VertexId, PartitionID)] = node_rels._1
val edgeRDD: RDD[Edge[PartitionID]] = node_rels._2.map(elem => Edge(elem._1, elem._2._1, elem._2._2))def format_node_rels(rdd: RDD[(String, String, String, String)]): (RDD[(VertexId, PartitionID)], RDD[(VertexId, (VertexId, PartitionID))]) = {val uid_nodes: RDD[(VertexId, PartitionID)] = rdd.map(elem => (elem._1.toLong, 1)).reduceByKey(_ + _).map(elem => (elem._1, 1))//通过uid的相同关系,将uid们连接起来val tid_nodes: RDD[(VertexId, (VertexId, PartitionID))] = rdd.map(elem => (elem._2, elem._1.toLong)).groupByKey().flatMap(elem => {val list = elem._2val min = list.minprintln(list)list.map(index => {((min, index), 1)})}).filter(elem => elem._1._1 != elem._1._2).reduceByKey(_ + _).map(elem => (elem._1._1, (elem._1._2, 1)))(uid_nodes, tid_nodes)}

3、构建图

 val graph: Graph[PartitionID, PartitionID] = Graph(uid_nodes, tid_nodes)

4、构建联通关系

val gang: RDD[(VertexId, Iterable[VertexId])] = connected(graph, vertexRDD)/*** 联通图* @param graph* @param vertexRDD*/
def connected(graph: Graph[PartitionID, PartitionID], vertexRDD: RDD[(VertexId, PartitionID)]): RDD[(VertexId, Iterable[VertexId])] = {//获得连通图val ccGraph: Graph[VertexId, PartitionID] = graph.connectedComponents()val res: RDD[(VertexId, Iterable[VertexId])] = ccGraph.vertices.join(vertexRDD).map {case (uid, (gangid, num)) => (gangid, uid)}.groupByKey()res}

5、结果存储

gang.foreachPartition(rdd => {rdd.foreach(elem => {val gandId = elem._1val list = elem._2val size = list.sizeif (size > 1) {list.foreach(uid => {//用户id,所在联通图id,联通图大小println(uid, gandId, size)})})})
})

后期优化

属于2期的优化,先对不同的关系进行加权,比如两个人同一wifi、同一经纬度,这个时候权重特别低;两个人用过同一设备、同一支付账号,这钟权重比较高。再通过对联通图使用Louvain进行训练,将1期中难以拆分的大联通图进行关系优化。

Neo4j

neo4j是一个开源的图数据库(集群版不开源,按节点收费,贼贵,单节点免费),同一业务拆分了一下,能满足当前的数据需求。只做查询,不做计算,之前也考虑过计算,效率低,内存消耗高,最终选择了SparkGraphX作为计算。

存储

存储分两部分:存量数据的初始化和增量数据,具体怎么实现,存量数据的初始化看我另一篇关于Neo4j的博客,增量数据在下面的代码中有。

结构

存储结构uid–>共同关系<–uid:
1002–实名认证–>小明<–实名认证–1044或者1002–支付账号–>zfbxxxxdfs1000<–提现账号–1044

使用

提供可视化的用户关系查询,经常会遇上不知道两个人为什么被关联到一个团伙里
查询两个人之间的10度关系

MATCH p=(from:User{uid:'1002'})-[*..10]-(to:User{uid:'1044'}) RETURN p limit 100

SparkStreaming

sparkStreaming(有条件的用flink)做两件事:

秒级实时关系存储

数据源:binlog(kafka)
数据存储:neo4j、kafka、redis
处理过程:(1)、普通数据,A用户的a特征,直接存储到neo4j、kafka、redis
(2)、多重join的数据,需要两个topic进行join的,没有用直接用join,用的是窗口函数+groupByKey
窗口大小是4s钟,滑动间隔是2s钟(根据业务本身延时和数据本身延时决定,因为此动作触发后,app会有几秒的等待时间,以及多个动作后的结果,保证风控数据到位后,业务才会去调用,否则,当羊毛党都薅完羊毛了,才检测到有风险,岂不是有些扯淡,所以有些动作比较卡,不是数据拥挤导致的,也许就说程序员在里面写的sleep(n),哈哈哈)。比如uid和支付id的关联关系,需要两个表,表中关键字段分表是,(order_id, uid)(order_id, pay_id)
,先将两个topic合流,然后调出相同字段

 .map(elem => (((elem._1, elem._2), elem._3), 1)) // ((提现表关联 ,order_id),uid/pay_id).reduceByKeyAndWindow((x:Int, y:Int)=>x+y, Seconds(30), Seconds(15)).map(elem=>(elem._1._1, elem._1._2)) //.groupByKey() // (统一表名 ,订单id) => (uid+pay_id).map(elem=>{//整理数据,除了提现表关联,还有很多关联关系}).foreach(存储)

neo4j

  //更新关系private def relationShip(session: Session, uid: Int, flag_label:String, flag_type:String, fuid: String): Any = {val sql =  s"""|match (p1:User{userId:$uid})-[r:$flag_type]->(p2:$flag_label{flagId:"$fuid"}) return p1.userId as uid""".stripMarginval result: StatementResult = session.run(sql)if (!result.hasNext) {val sql1 =s"""|MERGE (p1:User{userId:$uid})|MERGE (p2:$flag_label{flagId:"$fuid"})""".stripMarginsession.run(sql1)val sql =s"""|MATCH (p1:User{userId:$uid}),(p2:$flag_label{flagId:"$fuid"})|MERGE (p1)-[r:$flag_type{score:1}]->(p2)|RETURN p1.userId as uid""".stripMarginval rel: StatementResult = session.run(sql)if (rel.hasNext) {val record = rel.next()val uid = record.get("uid").toStringprintln(uid)}else{System.err.println("error:"+uid+flag_label+flag_type+fuid)}}}/*** 获取Driver* @return*/def getDriver(): Driver = {val url = "bolt://neo4j01:8687"val user = "user"val password = "password"val driver = GraphDatabase.driver(url, AuthTokens.basic(user, password), Config.build().withMaxIdleSessions(1000).withConnectionLivenessCheckTimeout(10, TimeUnit.SECONDS).toConfig)driver}/*** 获取Session* @param driver* @return*/def getSession(driver: Driver): Session = {val session = driver.session()session}

kafka

 def resToKafka(ssc: StreamingContext, kafkaDStreamValue: DStream[(String, String, String, String)]): Unit = {//广播KafkaSinkval kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.put("group.id", "realtime")p.put("acks", "all")p.put("retries ", "1")p.setProperty("bootstrap.servers", GetPropKey.brokers)p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}//写入KafkakafkaDStreamValue.foreachRDD(rdd => {if (!rdd.isEmpty()) {rdd.foreachPartition(partition => {partition.foreach(elem => {val flag_label = elem._3if (!flag_label.equals("null")) {val auth_info = elem._1val uid = elem._2.toIntval flag_type = elem._4val value = dataJson(uid, auth_info, flag_label, flag_type)kafkaProducer.value.send("risk_user_auth_info", value)}})})}})}/*** json格式化** @param uid* @param fid* @param flag_label* @param flag_type* @return*/def dataJson(uid: Int, fid: String, flag_label: String, flag_type: String): String = {val map = Map("user_id" -> uid,"flag_id" -> fid,"flag_label" -> flag_label,"flag_type" -> flag_type)JSONObject.apply(map).toString()}

纬度关联+存储

数据源:risk_user_auth_info(1中写入kafka的延时队列)
数据存储:redis
处理过程:从延时队列中取出数据,到redis中查询出关系,归结到父节点(迭代版本中加入了算法进行合并和拆分图),再存入redis

Drools

更新中。。。

spark在风控用户团伙中的应用相关推荐

  1. Spark在不同集群中的运行架构

    Spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式.部署在单台机器上时,既可以用本地(Local)模式运行,也可以使用伪分布式模式来运行:当以分布式集群部署 ...

  2. Spark系列之Spark在不同集群中的架构

    title: Spark系列 第十二章 Spark在不同集群中的架构 ​ Spark 注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式.部署在单台机器上时,既可以用 ...

  3. 嵌入式Linux设备驱动程序:用户空间中的设备驱动程序

    嵌入式Linux设备驱动程序:用户空间中的设备驱动程序 Embedded Linux device drivers: Device drivers in user space Interfacing ...

  4. 【2012百度之星/资格赛】H:用户请求中的品牌 [后缀数组]

    时间限制: 1000ms 内存限制: 65536kB 描述 馅饼同学是一个在百度工作,做用户请求(query)分析的同学,他在用户请求中经常会遇到一些很奇葩的词汇.在比方说"johnsonj ...

  5. ORA-01436: 用户数据中的CONNECT BY 循环

    起始地     目的地     距离(公里) A             B             1000 A             C             1100 A           ...

  6. 网站推广期间如何做好用户体验中的交互体验设计?

    在当前网站推广期间必然要充分获取用户使用体验以赢得用户好感,并吸引搜索引擎关注以此获取流量和排名等内容,让用户在该网站推广期间让自身需求的解决变得简单,并给予用户深刻印象,让用户网站访问期间获得良好的 ...

  7. Windows核心编程 第八章 用户方式中线程的同步(上)

    第8章 用户方式中线程的同步 当所有的线程在互相之间不需要进行通信的情况下就能够顺利地运行时, M i c r o s o f t Wi n d o w s的运行性能最好.但是,线程很少能够在所有的时 ...

  8. 用户方式中线程的同步——Windows核心编程学习手札之八

    用户方式中线程的同步 --Windows核心编程学习手札之八 系统中所有线程都必须拥有对各种系统资源的访问权,这些资源包括内存堆栈.串口.文件.窗口和许多其他资源.如果一个线程需要独占对资源的访问权, ...

  9. 【2012百度之星/资格赛】H:用户请求中的品牌

    时间限制:  1000ms  内存限制:  65536kB 描述 馅饼同学是一个在百度工作,做用户请求(query)分析的同学,他在用户请求中经常会遇到一些很奇葩的词汇.在比方说"johns ...

最新文章

  1. 微型计算机系统没有的总线是( ),微型计算机系统总线(1).ppt
  2. SUSE团队已将重心偏向GCC 7
  3. linux怎么抓sip包,Ubuntu下使用Wireshark进行抓包分析(含SIP和RTP包)
  4. Intent 隐示意图
  5. Python + OpenCV 环境配置
  6. 陶老师ESD、EMI、EMC讲座
  7. 江苏计算机二级c语言考试范围,江苏省计算机二级C语言考试大纲
  8. 2023王道C语言训练营(二叉查找树-顺序查找-折半查找)
  9. CreatePipe、CreateProcess函数
  10. iOS高仿app源码:10天时间纯代码打造高仿优质《内涵段子》
  11. Wagtail 教程 1 :基础设置
  12. iGoogle自定义
  13. MarkDown编辑器----小书匠
  14. git 删除历史commit
  15. Enzo高灵敏度检测——Arg8-Vasopressin ELISA kit
  16. 用字符数组输出平行四边形
  17. Fusion 360 最新动态 - 温度场和热应力分析
  18. 接入层、汇聚层、核心层交换机三者之间的功能详解
  19. 重来之大学版|社交生活篇——失恋了怎么办?失恋了很难受怎么办?如何走出失恋的痛苦?我失恋了该怎么办?如何从失恋的痛苦中走出来?
  20. properties文件

热门文章

  1. 面向对象和面向过程之间的区别以及优缺点
  2. 华为服务器mib文件升级固件,SNMP MIB 固 件 升 级
  3. swust oj 254 翻煎饼
  4. Ubuntu安装 pip3使用代理安装包
  5. 冒泡和快速排序的时间复杂度_十大经典排序算法——快速排序
  6. c#取小数点后三位_VB.NetC#-取小数点后几位但不四舍五入的方法
  7. 用PyOpenGL/OpenGL高速(异步)保存像素到内存
  8. 英语学习详细笔记(十九)介系词
  9. jenkins安装、启动报错、卸载问题
  10. Mysql-创建数据库和数据表时指定编码格式