一、what is pmml?

  1. PMML是数据挖掘的一种通用的规范,它用统一的XML格式来描述机器学习的模型。无论模型是sklearn,R还是Spark MLlib生成的,都可以将其转化为标准的XML格式来存储。当我们需要将这个PMML的模型用于部署的时候,可以使用目标环境的解析PMML模型的库来创建模型对象,然后进行预测。
  2. pmml代表的是已经训练好的模型。

二、使用pmml预测数据的原理

须知:

{1} jvm平台有jpmml库来提供api。spark有jpmml-spark,是对jpmml的进一步封装。

jpmml分为3部分,pmml-evaluator、pmml-evaluator-extension、pmml-model,其中pmml-model在>spark2.0中集成,但使用时需要exclude

{2} pmml中只有<MiningSchema> / <MiningField><Output> / <Outputfield>来设置输出的列。从层级上MiningField是Outputfield的上级,MiningField叫targetField,Outputfield叫outputField。如果transformer的explode设置为false,那么会拼接为一个复杂的上下级的数据结构,比如df中的struct类型(这种类型保存为csv时会报错,需要转成string)。

import org.apache.spark.sql.functions.col
resultDF.select(col(列名).cast("string"))

1. 通过pmml创建evaluator对象。这个对象用来进行对数据进行预测。创建时可以通过setVisitors设置pmml中一些标签的过滤,因为有些数据可能在某些任务中用不到,可以减少内存。

val builder: LoadingModelEvaluatorBuilder = new LoadingModelEvaluatorBuilder().setLocatable(false)/*裁剪pmml,将pmml中的一些数据过滤,DefaultVisitorBattery是加载所有。可以自定义。*/.setVisitors(new DefaultVisitorBattery()).load(inputStream)

2.再通过evaluator创建transformer对象,这个对象用来预测1整个df中的数据,是对evaluator的封装,把源df转成目标df,可以设置输出df的列。注意的是输出的df包含源df的所有列,而源df是不能filter的,这样预测就不准了,所以只能在目的地df中进行filter。可以使用df的select方法选择要输出的列,一般是把源的主键列和所有预测列输出。

  1. 结果df中列的顺序同这些方法的设置顺序一致
  2. 列名都可以自定义,可以用来进行筛选
method fun
withTargetCols() 在结果df中设置默认的target列
withLabelCol(“customLabel”) 在结果df中设置自定义的target列,值同默认target的相同
withOutputCols() 在结果df中添加默认的output列
withProbabilityCol(“customLabelProb”) 在结果df中添加自定义名称的output列,值同默认的output列相同
exploded(true) 因为结果是2层结构,所以第二层是否跟第一层平级
val pmmlTransformerBuilder = new TransformerBuilder(evaluator).withTargetCols() //label.withLabelCol("customLabel").withOutputCols() //outField.withProbabilityCol("customLabelProb")/*结果列是否exploded,如果为false,会在label下设置outField的复杂类型。保存为csv文件时不支持直接输出这种复杂的struct类型,会报错,需要手动转成string类型*/.exploded(true)

  1. 如果exploded(false),则所有的结果列都会放在1个pmml列下面,默认的列名是pmml,且这个列名无法修改

三、spark使用pmml预测数据的API和usage

须知:

  • spark官方没有提供这个功能,要使用开源的第三方包,按功能分为2类:
  • 使用sparkml进行数据分析:https://blog.csdn.net/vv545454/article/details/85545126

1.使用pmml预测数据:

1.1 JPMML-Evaluator-Spark: https://github.com/jpmml/jpmml-evaluator-spark

1.2 项目依赖和打包插件,此处有坑

{1} 根据官网,JPMML-Evaluator library依赖的JPMML-Model和Guava和spark、Hadoop冲突,要使用maven的shade插件将2者的依赖划分到不同的命名空间,避免同集群上的冲突

 如果不用shade,会报莫名其妙的错,比如unexpected element (uri:"http://www.dmg.org/PMML-4_3", local:"PMML")

{2} 因为sparkmllib2.2.0也依赖了pmml-model,并且版本很低,是1.2.15。这个model会跟jpmml-evaluator-spark的代码冲突,会报error: Class org.jpmml.model.VisitorBattery not found。maven中,即使把sparkmllib给provided了,如果有别的工件依赖了sparkmllib里的jar,那这个jar也会被打包进去,jpmml-evaluator-spark依赖的是1.4.8的model,但maven打包时,如果没有显式的指定,会找第一个,所以要在provided的mllib中exclude pmml-model:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala_spark.version}</artifactId><version>${spark.version}</version><scope>${provided.scope}</scope><exclusions><exclusion><groupId>org.jpmml</groupId><artifactId>pmml-model</artifactId></exclusion></exclusions>
</dependency>

{3} 经测试,还要显式的导入pmml-evaluator,pmml-model可以不导,导也不会报错。

jpmml-evaluator-spark自带的evaluator是1.4.6的,自带的pmml-model是1.4.8的,而显式导入的evaluator是1.4.7的,猜测可能跟evaluator版本有关。

 最终的pom:
<dependencies><!--mllib依赖core和sql--><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala_spark.version}</artifactId><version>${spark.version}</version><scope>${provided.scope}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala_spark.version}</artifactId><version>${spark.version}</version><scope>${provided.scope}</scope></dependency>--><!--sparkml--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala_spark.version}</artifactId><version>${spark.version}</version><scope>${provided.scope}</scope><exclusions><exclusion><groupId>org.jpmml</groupId><artifactId>pmml-model</artifactId></exclusion></exclusions></dependency><!--PMML-Spark--><dependency><groupId>org.jpmml</groupId><artifactId>jpmml-evaluator-spark</artifactId><version>1.2.2</version></dependency><dependency><groupId>org.jpmml</groupId><artifactId>pmml-evaluator</artifactId><version>1.4.7</version></dependency><!--pmml-model导不导都可以--><!-- <dependency<groupId>org.jpmml</groupId><artifactId>pmml-model</artifactId><version>1.4.8</version></dependency>-->
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${project.artifactId}-shade-${project.version}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!--下面2个<relocations>都可以,第2个更精确--><!--此处是为了避免跟集群上的guava和pmml-model冲突--><relocations>   <relocation><pattern>com.google</pattern><shadedPattern>com.shade.google</shadedPattern></relocation><relocation><pattern>org.jpmml</pattern><shadedPattern>org.shade.jpmml</shadedPattern></relocation><relocation><pattern>org.dmg</pattern><shadedPattern>org.shade.dmg</shadedPattern></relocation></relocations><!--此处是为了避免跟集群上的guava和pmml-model冲突--><relocations><!--guava--><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>com.google.thirdparty.publicsuffix</pattern><shadedPattern>com.shade.google.thirdparty.publicsuffix</shadedPattern></relocation><!--pmml-model--><relocation><pattern>org.jpmml.model</pattern><shadedPattern>org.shade.jpmml.model</shadedPattern></relocation><relocation><pattern>org.dmg</pattern><shadedPattern>org.shade.dmg</shadedPattern></relocation></relocations></configuration></execution></executions></plugin><plugin><!--scala原始在sbt(类似java maven)上做开发,现可以用这个插件来在maven中进行开发--><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><!--<include>用来设置源--><include>**/*.scala</include></includes></configuration></execution></executions></plugin></plugins>
</build>

3.代码

 private def pmmlPredict(inputStream: FSDataInputStream, df: DataFrame,colmnName:String): DataFrame = {/**  根据pmml文件,使用sparkmllib构建模型*/val builder: LoadingModelEvaluatorBuilder = new LoadingModelEvaluatorBuilder().setLocatable(false)/*裁剪pmml,将pmml中的一些数据过滤,DefaultVisitorBattery是加载所有。可以自定义。*/.setVisitors(new DefaultVisitorBattery()).load(inputStream)val evaluator: ModelEvaluator[_] = builder.build()// Performing a self-check (duplicates as a warm-up)//evaluator.verify()val pmmlTransformerBuilder = new TransformerBuilder(evaluator).withTargetCols() //label.withOutputCols() //outField/*结果列是否exploded,如果为false,会在label下设置outField的复杂类型。csv文件不支持直接输出这种复杂的struct类型,会报错,需要手动转成string类型*/.exploded(true)val transformer: Transformer = pmmlTransformerBuilder.build()/*1.java的集合必须隐式转换为scala中的容器才能使用scala的方法2.map的返回值就是1个List3.`_`直接用,不用在前面加`_ =>`,这样反而会报错4.TargetField.getFieldName返回的是FieldName类型,还要toString*/import scala.collection.JavaConversions._import org.apache.spark.sql.functions.colval targetFields = evaluator.getTargetFields.toList.map(_.getFieldName.toString)val outputFields = evaluator.getOutputFields.toList.map(_.getFieldName.toString)/*预测select方法:1.select不能连续调用多次,只能在1个里面把列设置全2.输入String类型时,args0必须先设置1个列,然后后面再加可变参数。注意不能是带逗号的字符串,那样会把整体当成一列,会报错。3.参数是column类型时,可以传入一个List[Column]然后`:_*`。不需要设个args04.把String转成Column的方法,可以直接调用col,但要导入隐式转换。import org.apache.spark.sql.functions.col//超过2个方法时,`_`就不能用了,就要设置变量。val outputFields1 = evaluator.getOutputFields.toList.map(old => col(old.getFieldName.toString))val resultDF1: DataFrame = transformer.transform(df).select(col(colmnName) :: targetFields ::: outputFields : _ *)*/val resultDF: DataFrame = transformer.transform(df).select(colmnName,targetFields ::: outputFields : _ *)resultDF.show(false)resultDF}

4.提交的命令

spark2-submit --master yarn --deploy-mode client  \
--class com.cib.dqms.SparkWithPMMLAndDQMS \
--jars /opt/test/spark_pmml_dqms-1.0-SNAPSHOT-shade.jar \
/opt/test/spark_pmml_dqms-1.0-SNAPSHOT.jar   \
/user/spark/test.pmml dctest.test \
1 \
qweqwe \
将spark的模型对象转为pmml:
  1. JPMML-SparkML https://github.com/jpmml/jpmml-sparkml

三、

  1. df预测前后分区数不会变。

Spark整合PMML的实践相关推荐

  1. Spark在美团的实践

    本文已发表在<程序员>杂志2016年4月期. 前言 美团是数据驱动的互联网服务,用户每天在美团上的点击.浏览.下单支付行为都会产生海量的日志,这些日志数据将被汇总处理.分析.挖掘与学习,为 ...

  2. 大数据真实案例:Spark在美团的实践

    美团是数据驱动的互联网服务,用户每天在美团上的点击.浏览.下单支付行为都会产生海量的日志,这些日志数据将被汇总处理.分析.挖掘与学习,为美团的各种推荐.搜索系统甚至公司战略目标制定提供数据支持.大数据 ...

  3. Spark整合Kafka小项目

    SparkStreaming与kafka整合小项目实践含所有代码带详细注释 总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从ka ...

  4. phoenix+hbase+Spark整合,Spark处理数据操作phoenix入hbase,Spring Cloud整合phoenix

    1 版本要求 Spark版本:spark-2.3.0-bin-hadoop2.7 Phoenix版本:apache-phoenix-4.14.1-HBase-1.4-bin HBASE版本:hbase ...

  5. spark整合MySQL

    spark整合MySQL <dependency><groupId>mysql</groupId><artifactId>mysql-connector ...

  6. Spark 大数据处理最佳实践

    开源大数据社区 & 阿里云 EMR 系列直播 第十一期 主题:Spark 大数据处理最佳实践 讲师:简锋,阿里云 EMR 数据开发平台 负责人 内容框架: 大数据概览 如何摆脱技术小白 Spa ...

  7. Spark整合ElasticSearch

    2019独角兽企业重金招聘Python工程师标准>>> spark整合elasticsearch两种方式 1.自己生成_id等元数据 2.使用ES默认生成 引入对应依赖 <de ...

  8. Spark 整合ElasticSearch

    Spark 整合ElasticSearch 因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例. ...

  9. spark整合hive

    目录 spark-shell整合 安装hive 配置信息 启动spark 测试 idea中spark整合 windows下搭建hadoop 配置环境变量 添加文件 idea连接虚拟机 连接文件 连接虚 ...

最新文章

  1. html5动态气泡效果6,[jQuery]Canvas气泡动态背景效果
  2. fastText中常见问题汇总
  3. c语言字母输出什么意思,C语言中字符的输入输出以及计算字符个数的方法详解...
  4. Kinect开发学习笔记之(六)带游戏者ID的深度数据的提取
  5. 阿里 异构数据 mysql_异构数据库迁移
  6. MyBatis 拦截器 (实现分页功能)
  7. 杰里之混响音效调试【篇】
  8. 计算机画图虚线,天正建筑怎么把直线变成虚线
  9. 《我们终将遇到爱与孤独》
  10. 使用WebSocket实现多人实时聊天
  11. 微信小程序 获取用户昵称 头像 性别...『并解决获取的头像模糊问题』
  12. 房价基本上决定于货币政策.----看中国货币发行量增长
  13. 涂鸦智能三明治音视频核心板(BK7256)开箱测评
  14. M1 ARM版miniforge安装与移除TensorFlow和pytorch环境
  15. 权限控制框架 shiro
  16. Au 效果器详解:参数均衡器
  17. 临近毕业,2020春招困惑你的十大问题,你中招了吗?
  18. HyperMesh Notes
  19. python 代理ip群发邮件1000人_python 群发邮件数量限制_qq邮箱群发邮件的数量和速度限制是多少?...
  20. 图像局部特征学习(笔记1之SUSAN角点检测)

热门文章

  1. c++ lambda函数
  2. 硬件工程师基础知识1
  3. 100种思维模型之微观经济学思维模型-88
  4. python 全栈开发,Day1(python介绍,变量,if,while)
  5. Java微服务组件Spring cloud ribbon源码分析
  6. 测试驱动开发-TDD(1)
  7. mysql学习-创建股票行情数据库(1)
  8. springboot完成拉取微信公众号关注列表并通过列表推送消息
  9. date的oracle格式,Oracle的默认date格式是YYYY-MM-DD,为什么?
  10. NVIDIA显卡(GPU)性能参数一览表