版本:

scala2.11.8

spark-1.6.2-bin-hadoop2.6

hadoop2.6

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kejin</groupId><artifactId>dataload</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- Spark的依赖引入 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId></dependency><!-- 引入Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><!-- 加入MongoDB的驱动 --><dependency><groupId>org.mongodb</groupId><artifactId>casbah-core_2.11</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.11</artifactId><version>2.0.0</version></dependency><!-- 引入Spark相关的Jar包 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.11.8</version></dependency></dependencies></project>

DataLoader.scala:

package com.kejin.recommenderimport com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Product数据集* 3982                            商品ID* Fuhlen 富勒 M8眩光舞者时尚节能    商品名称* 1057,439,736                    商品分类ID,不需要* B009EJN4T2                      亚马逊ID,不需要* https://images-cn-4.ssl-image   商品的图片URL* 外设产品|鼠标|电脑/办公           商品分类* 富勒|鼠标|电子产品|好用|外观漂亮   商品UGC标签*/
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )/*** Rating数据集* 4867        用户ID* 457976      商品ID* 5.0         评分* 1395676800  时间戳*/
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )/*** MongoDB连接配置* @param uri    MongoDB的连接uri* @param db     要操作的db*/
case class MongoConfig( uri: String, db: String )object DataLoader {// 定义数据文件路径val PRODUCT_DATA_PATH = "E:\\myworkspace\\dataload\\src\\main\\resources\\products.csv"val RATING_DATA_PATH = "E:\\myworkspace\\dataload\\src\\main\\resources\\ratings.csv"// 定义mongodb中存储的表名val MONGODB_PRODUCT_COLLECTION = "Product"val MONGODB_RATING_COLLECTION = "Rating"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://192.168.31.53:27017/jeesun","mongo.db" -> "jeesun")// 创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")// 创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// 加载数据val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)val productDF = productRDD.map( item => {// product数据通过^分隔,切分出来val attr = item.split("\\^")// 转换成ProductProduct( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )} ).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)val ratingDF = ratingRDD.map( item => {val attr = item.split(",")Rating( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )} ).toDF()implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )storeDataInMongoDB( productDF, ratingDF )spark.stop()}def storeDataInMongoDB( productDF: DataFrame, ratingDF: DataFrame )(implicit mongoConfig: MongoConfig): Unit ={// 新建一个mongodb的连接,客户端val mongoClient = MongoClient( MongoClientURI(mongoConfig.uri) )// 定义要操作的mongodb表,可以理解为 db.Productval productCollection = mongoClient( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )val ratingCollection = mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )// 如果表已经存在,则删掉productCollection.dropCollection()ratingCollection.dropCollection()// 将当前数据存入对应的表中productDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_PRODUCT_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()// 对表创建索引productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )mongoClient.close()}
}

电商推荐系统-数据加载相关推荐

  1. 关于 SAP 电商云首页加载时触发的 OCC API 请求

    当我访问本地启动的 SAP Spartacus Storefront 时,在 Chrome 开发者工具里观察到总共 23 个 OCC API 调用: http://localhost:4200/ele ...

  2. 3.创建项目并初始化业务数据(电商推荐系统)

    文章目录 一.在IDEA中创建maven项目 1.项目框架搭建 2.声明项目中工具的版本信息 3.添加项目依赖 二.数据加载准备 1.Products数据集 2.Ratings数据集 3.日志管理配置 ...

  3. 电商大数据项目(二)-推荐系统实战之实时分析以及离线分析

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

  4. 视频教程-企业级电商大数据推荐系统实战-大数据

    企业级电商大数据推荐系统实战 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰富的企业应 ...

  5. 电商大数据项目-推荐系统实战之推荐算法(三)

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

  6. 【大数据实战电商推荐系统】

    文章目录 第1章 项目体系框架设计 第2章 工具环境搭建 第3章 项目创建并初始化业务数据 3.1 IDEA创建Maven项目(略) 3.2 数据加载准备(说明书) 3.3 数据初始化到MongoDB ...

  7. 电商大数据项目-推荐系统实战(一)

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

  8. python电商项目介绍_电商大数据项目-推荐系统实战(一)

    本项目是基于Spark MLLib的大数据电商推荐系统项目,使用了scala语言和java语言.基于python语言的推荐系统项目会另外写一篇博客.在阅读本博客以前,需要有以下基础: 1.linux的 ...

  9. 大数据之电商推荐系统

    #大数据之电商推荐系统# 项目系统架构 数据整理 商品数据 商品ID 商品名称 商品种类 商品图片URL 商品标签 productId name categories imageUrl tags 评分 ...

最新文章

  1. windows创建定时任务执行python脚本
  2. C#中实现计时器功能(定时任务和计时多长时间后执行某方法)
  3. 3.6.6 码点与代码单元
  4. 实现IButtonControl接口时, PerformClick()方法的代码怎么写,请高手指导!
  5. MongoDB的增删改查
  6. 为什么要网页模块化?
  7. datax底层原理_Datax 任务分配原理
  8. cpu控制器如何工作
  9. 【积跬步以至千里】Windows无法访问指定设备,路径或文件,您可能没有合适的权限访问
  10. 移动应用的全新方式:超级app+轻应用
  11. 4G模块 | 基于4G Cat.1的内网穿透实践
  12. RuntimeError: Expected object of type torch.cuda.LongTensor but found type torch.cuda.IntTensor
  13. ArcGIS基础实验操作100例--实验13 数字化面图形的技巧
  14. MPEG-2 PS流
  15. 《向上生长》读书摘记
  16. 金立e3t刷android4.4,金立E3T刷机包 基于百度云官方ROM 深度精简 超强权限 省电耐用 安全稳定...
  17. Argo-DCS数据传输笔记
  18. window.navigator详解和使用场景
  19. excel用条件格式设置隔行变色
  20. 什么是环境变量?环境变量配置,jdk8的path环境变量配置,各个版本下jdk结构的变化以及环境变量配置的阐述,java命令执行三个不同目录下的java.exe说明

热门文章

  1. opencv 简单美颜效果
  2. 学习笔记Day02——Linux基础2——系统安装及基础知识
  3. UEFI原理与编程(四)(dec dsc inf文件)
  4. 基于单片机超声波测距系统设计
  5. python字典拷贝_Python字典(dict)拷贝
  6. 非常详细的51单片机引脚介绍
  7. java中蛇的属性有哪些_Java Swing中的蛇游戏-我的蛇只会长成
  8. wireshark 筛选diameter消息
  9. Apache skywalking分布式追踪系统
  10. Docker安装和使用