在pyspark中,使用sparksql进行mysql数据的读写处理,将程序保存为test.py

一、准备工作

#-*- coding: UTF-8 -*-
# 设置python的默认编码
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# Spark 初始化
from pyspark.sql import SQLContext, SparkSession, Row
spark = SparkSession.Builder().master('local').appName('sparkSql').getOrCreate()
sqlContext = SQLContext(spark) #sparksql方法

二、连接mysql数据库

# 设置数据库信息
prop = {'user':'','password':'','driver':'com.mysql.cj.jdbc.Driver'}
url_dim = 'jdbc:mysql://ip:port/kscs_dim?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_ods = 'jdbc:mysql://am-bp14fo4aez0qi09y5167320o.ads.aliyuncs.com:3306/kscs_ods?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dwd = 'jdbc:mysql://ip:port/kscs_dwd?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_dws = 'jdbc:mysql://ip:port/kscs_dws?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'
url_ads = 'jdbc:mysql://ip:port/kscs_ads?useUnicode=true&autoReconnect=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai'

三、读取mysql数据并写入mysql

1、表读取直接写入mysql

全量读取,直接写入mysql

# 读取数据
print('====read====')
data = spark.read.jdbc(url=url_ads, table='ads_ec_douyin_live_data', properties=prop)
print(type(data)) #查看读取的数据类型(df类型)
data.show() #查看读取的数据
# 单表写入
print('====write====')
data.write.jdbc(url=url_ads, table='ads_ec_douyin_live_data_test', mode='append', properties=prop)spark.stop()

部分读取直接写入mysql

  • 读取数据环节:资源压力在数据库,所以可以先进行数据量相关的条件筛选
  • 数据处理环节:资源压力在spark服务,所以在此环节进行解析、逻辑操作
  • sql中要使用双引号
# 读取数据
print('====read====')
data = spark.read.jdbc(url=url_ads, table='(select * from ads_ec_douyin_live_data where data_date ="2022-12-01") tb', properties=prop)
print(type(data)) #查看读取的数据类型(df类型)
data.show() #查看读取的数据
# 单表写入
print('====write====')
data.write.jdbc(url=url_ads, table='ads_ec_douyin_live_data_test', mode='append', properties=prop)spark.stop()
2、表处理并写入mysql

表处理需要先创建视图,用视图进行单表处理,或者多表关联处理,将处理结果写入数据库。否则数据量太大,不利于处理性能。

  • 单表读取并处理
    创建视图之:读取的同时建视图
# 按条件读取数据并建立视图
print('====read====')
data_adv = spark.read.jdbc(url=url_ods, table='(select data_date,advertiser_id,data from ods_ad_qc_advertiser_data where data_date = "2022-12-01") tb', properties=prop).createOrReplaceTempView('df_adv')

sparksql解析之:spark直接调用sql

# json解析
df = spark.sql("""select data_date,advertiser_id,get_json_object(get_json_object(params,'$.filtering'),'$.marketing_goal') marketing_goal,cast(get_json_object(data,'$.stat_cost') as decimal(19,2)) cost,cast(get_json_object(data,'$.pay_order_count') as int) pay_order_count,cast(get_json_object(data,'$.pay_order_amount') as decimal(19,2)) pay_order_amount,cast(get_json_object(data,'$.show_cnt') as int) show_cnt,cast(get_json_object(data,'$.click_cnt') as int) click_cnt,cast(get_json_object(data,'$.dy_follow') as int) dy_follow,cast(get_json_object(data,'$.create_order_count') as int) create_order_count,cast(get_json_object(data,'$.create_order_amount') as decimal(19,2)) create_order_amount,cast(get_json_object(data,'$.prepay_order_count') as int) prepay_order_count,cast(get_json_object(data,'$.prepay_order_amount') as decimal(19,2)) prepay_order_amount
from df_adv""")df.show()

sparksql解析之:使用sqlContext中的sql方法
使用该方法前,sqlContext必须定义好sqlContext = SQLContext(spark)

# json解析
df = sqlContext.sql("""select data_date,advertiser_id,get_json_object(get_json_object(params,'$.filtering'),'$.marketing_goal') marketing_goal,cast(get_json_object(data,'$.stat_cost') as decimal(19,2)) cost,cast(get_json_object(data,'$.pay_order_count') as int) pay_order_count,cast(get_json_object(data,'$.pay_order_amount') as decimal(19,2)) pay_order_amount,cast(get_json_object(data,'$.show_cnt') as int) show_cnt,cast(get_json_object(data,'$.click_cnt') as int) click_cnt,cast(get_json_object(data,'$.dy_follow') as int) dy_follow,cast(get_json_object(data,'$.create_order_count') as int) create_order_count,cast(get_json_object(data,'$.create_order_amount') as decimal(19,2)) create_order_amount,cast(get_json_object(data,'$.prepay_order_count') as int) prepay_order_count,cast(get_json_object(data,'$.prepay_order_amount') as decimal(19,2)) prepay_order_amount
from df_adv""")df.show()

数据写入

# 单表写入
print('====write====')
df.write.jdbc(url=url_dws, table='dws_ad_qc_advertiser_data_test', mode='append', properties=prop)spark.stop()
  • 多表读取并处理
    创建视图之:读取→存视图
# 读取数据
print('====read====')
data_shop = spark.read.jdbc(url=url_dim, table='dim_ec_douyin_shop_account_info', properties=prop)
data_seller = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_seller', properties=prop)
data_talent = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_talent', properties=prop)
print(type(data_seller )) #查看读取的数据类型(df类型)
data_seller .show() #查看读取的数据
# 新建视图
sqlContext = SQLContext(spark)
sqlContext.registerDataFrameAsTable(data_shop,'df_shop')
sqlContext.registerDataFrameAsTable(data_seller,'df_seller')
sqlContext.registerDataFrameAsTable(data_talent,'df_talent')

创建视图之:读取的同时建视图

# 读取数据并新建视图
print('====read====')
data_shop = spark.read.jdbc(url=url_dim, table='dim_ec_douyin_shop_account_info', properties=prop).createOrReplaceTempView('df_shop')
data_seller = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_seller', properties=prop).createOrReplaceTempView('df_seller')
data_talent = spark.read.jdbc(url=url_dwd, table='dwd_ec_douyin_live_detail_data_talent', properties=prop).createOrReplaceTempView('df_talent')spark.sql('select * from df_seller').show() #查看视图的数据

多视图关联处理
此处以sqlContext中的sql方法为例

# 多表联查:计算字段必须要设置别名,与库表字段一致,否则无法识别数据对应库表中的哪一个字段,导致写数失败
df = sqlContext.sql("""select now() add_time,now() update_time,a.data_date,a.shop_num,a.shop_name,a.brand,a.trade,a.shop_manager,a.trade_manager,a.dept_name,a.`power`,a.status,a.start_date,a.end_date,ifnull(a.account_opr_type,b.account_opr_type) account_opr_type,a.account_type,a.account_id,a.account_name,b.live_id,b.live_show_pv,b.live_show_uv,b.watch_uv,b.watch_uv_zr,b.watch_uv_ff,b.pay_ucnt,b.a_c_u,b.p_c_u,b.watch_cvr_uv,b.watch_interact_rate_uv,b.avg_watch_duration,b.new_fans_cnt,b.watch_follow_rate_uv,b.like_cnt,b.comment_cnt,cast(b.pay_ucnt_interact+b.unpaid_ucnt_interact as int) interact_uv,cast(b.watch_uv*b.watch_uv_rate_fans as int) watch_uv_fans,b.watch_uv_rate_fans,b.unpaid_ucnt,b.watch_unpaid_rate,b.pay_ucnt_newc,b.pay_ucnt_rate_newc,cast(b.pay_ucnt_newc*b.fans_ucnt_rate_paynewc as int) fans_ucnt_paynewc,cast(b.pay_ucnt_newc*b.interact_ucnt_rate_paynewc as int) interact_ucnt_paynewc,b.fans_ucnt_rate_paynewc,b.interact_ucnt_rate_paynewc,b.pay_ucnt_oldc,b.pay_ucnt_rate_oldc,cast(b.pay_ucnt_oldc*b.fans_ucnt_rate_payoldc as int) fans_ucnt_payoldc,cast(b.pay_ucnt_oldc*b.interact_ucnt_rate_payoldc as int) interact_ucnt_payoldc,b.fans_ucnt_rate_payoldc,b.interact_ucnt_rate_payoldc,b.pay_ucnt_interact,b.pay_ucnt_rate_interact,cast(b.pay_ucnt_interact*b.fans_ucnt_rate_payinteract as int) fans_ucnt_payinteract,cast(b.pay_ucnt_interact*b.newc_ucnt_rate_payinteract as int) newc_ucnt_payinteract,b.fans_ucnt_rate_payinteract,b.newc_ucnt_rate_payinteract,b.pay_ucnt_nointeract,b.pay_ucnt_rate_nointeract,cast(b.pay_ucnt_nointeract*b.fans_ucnt_rate_paynointeract as int) fans_ucnt_paynointeract,cast(b.pay_ucnt_nointeract*b.newc_ucnt_rate_paynointeract as int) newc_ucnt_paynointeract,b.fans_ucnt_rate_paynointeract,b.newc_ucnt_rate_paynointeract,b.pay_ucnt_fans,b.pay_ucnt_rate_fans,cast(b.pay_ucnt_fans*b.newfans_ucnt_rate_payfans as int) newfans_ucnt_payfans,b.interact_ucnt_rate_payfans,b.newfans_ucnt_rate_payfans,b.pay_ucnt_passer,b.pay_ucnt_rate_passer,cast(b.pay_ucnt_passer*b.interact_ucnt_rate_paypasser as int) interact_ucnt_paypasser,cast(b.pay_ucnt_passer*b.newc_ucnt_rate_paypasser as int) newc_ucnt_paypasser,b.interact_ucnt_rate_paypasser,b.newc_ucnt_rate_paypasser,b.watch_uv_unpaidval,b.watch_uv_rate_unpaidval,cast(b.watch_uv_unpaidval*b.fans_ucnt_rate_unpaidval as int) fans_ucnt_unpaidval,cast(b.watch_uv_unpaidval*b.interact_ucnt_rate_unpaidval as int) interact_ucnt_unpaidval,b.fans_ucnt_rate_unpaidval,b.interact_ucnt_rate_unpaidval,b.watch_uv_unpaidinv,b.watch_uv_rate_unpaidinv,cast(b.watch_uv_unpaidinv*b.fans_ucnt_rate_unpaidinv as int) fans_ucnt_unpaidinv,cast(b.watch_uv_unpaidinv*b.interact_ucnt_rate_unpaidinv as int) interact_ucnt_unpaidinv,b.fans_ucnt_rate_unpaidinv,b.interact_ucnt_rate_unpaidinv,b.unpaid_ucnt_interact,b.unpaid_ucnt_rate_interact,cast(b.unpaid_ucnt_interact*b.fans_ucnt_rate_unpaidinteract as int) fans_ucnt_unpaidinteract,cast(b.unpaid_ucnt_interact*b.watch_ucnt_rate_inv_unpaidinteract as int) watch_ucnt_inv_unpaidinteract,b.fans_ucnt_rate_unpaidinteract,b.watch_ucnt_rate_inv_unpaidinteract,b.unpaid_ucnt_nointeract,b.unpaid_ucnt_rate_nointeract,cast(b.unpaid_ucnt_nointeract*b.fans_ucnt_rate_unpaidnointeract as int) fans_ucnt_unpaidnointeract,cast(b.unpaid_ucnt_nointeract*b.watch_ucnt_rate_inv_unpaidnointeract as int) watch_ucnt_inv_unpaidnointeract,b.fans_ucnt_rate_unpaidnointeract,b.watch_ucnt_rate_inv_unpaidnointeract,b.unpaid_ucnt_fans,b.unpaid_ucnt_rate_fans,cast(b.unpaid_ucnt_fans*b.watch_ucnt_rate_inv_unpaidfans as int) watch_ucnt_inv_unpaidfans,b.interact_ucnt_rate_unpaidfans,b.watch_ucnt_rate_inv_unpaidfans,b.unpaid_ucnt_passer,b.unpaid_ucnt_rate_passer,cast(b.unpaid_ucnt_passer*b.interact_ucnt_rate_unpaidpasser as int) interact_ucnt_unpaidpasser,cast(b.unpaid_ucnt_passer*b.watch_ucnt_rate_inv_unpaidpasser as int) watch_ucnt_inv_unpaidpasser,b.interact_ucnt_rate_unpaidpasser,b.watch_ucnt_rate_inv_unpaidpasser,b.punish_cnt,b.punish_detail
from
-- 店铺-账号信息
(select * from df_shop where data_date>='2022-12-06') a-- 直播详情数据
left join
(-- 商家视角
SELECTdata_date,shop_num,shop_name,account_opr_type,account_id,live_id,live_show_pv,live_show_uv,watch_uv,watch_uv_zr,watch_uv_ff,pay_ucnt,a_c_u,p_c_u,watch_cvr_uv,watch_interact_rate_uv,avg_watch_duration,new_fans_cnt,watch_follow_rate_uv,like_cnt,comment_cnt,watch_uv_rate_fans,unpaid_ucnt,watch_unpaid_rate,pay_ucnt_newc,pay_ucnt_rate_newc,fans_ucnt_rate_paynewc,interact_ucnt_rate_paynewc,pay_ucnt_oldc,pay_ucnt_rate_oldc,fans_ucnt_rate_payoldc,interact_ucnt_rate_payoldc,pay_ucnt_interact,pay_ucnt_rate_interact,fans_ucnt_rate_payinteract,newc_ucnt_rate_payinteract,pay_ucnt_nointeract,pay_ucnt_rate_nointeract,fans_ucnt_rate_paynointeract,newc_ucnt_rate_paynointeract,pay_ucnt_fans,pay_ucnt_rate_fans,interact_ucnt_rate_payfans,newfans_ucnt_rate_payfans,pay_ucnt_passer,pay_ucnt_rate_passer,interact_ucnt_rate_paypasser,newc_ucnt_rate_paypasser,watch_uv_unpaidval,watch_uv_rate_unpaidval,fans_ucnt_rate_unpaidval,interact_ucnt_rate_unpaidval,watch_uv_unpaidinv,watch_uv_rate_unpaidinv,fans_ucnt_rate_unpaidinv,interact_ucnt_rate_unpaidinv,unpaid_ucnt_interact,unpaid_ucnt_rate_interact,fans_ucnt_rate_unpaidinteract,watch_ucnt_rate_inv_unpaidinteract,unpaid_ucnt_nointeract,unpaid_ucnt_rate_nointeract,fans_ucnt_rate_unpaidnointeract,watch_ucnt_rate_inv_unpaidnointeract,unpaid_ucnt_fans,unpaid_ucnt_rate_fans,interact_ucnt_rate_unpaidfans,watch_ucnt_rate_inv_unpaidfans,unpaid_ucnt_passer,unpaid_ucnt_rate_passer,interact_ucnt_rate_unpaidpasser,watch_ucnt_rate_inv_unpaidpasser,punish_cnt,punish_detail
FROM df_seller where data_date>='2022-12-06'-- 达人视角
union
SELECTdata_date,shop_num,shop_name,account_opr_type,account_id,live_id,live_show_pv,live_show_uv,watch_uv,watch_uv_zr,watch_uv_ff,pay_ucnt,a_c_u,p_c_u,watch_cvr_uv,watch_interact_rate_uv,avg_watch_duration,new_fans_cnt,watch_follow_rate_uv,like_cnt,comment_cnt,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,punish_cnt,punish_detail
FROM df_talent where data_date>='2022-12-06') b
on a.data_date=b.data_date and a.shop_num=b.shop_num and a.account_id=b.account_id""")
print(type(df))
df.show()

写入mysql

# 写入数据
print('====write====')
df.write.jdbc(url=url_dws, table='dws_ec_douyin_live_detail_data_test', mode='append', properties=prop)spark.stop()

四、程序提交

程序可以放在调度平台上进行启动(调度平台的服务器上需要部署Spark运行环境)

也可以直接在spark服务器上提交程序到集群
将test.py放在服务器的以下路径:dolphinscheduler/data/kscs/resources/test.py,执行脚本如下:

# 提交程序到local模式的spark
./bin/spark-submit --master local --driver-cores 1 --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 2G --jars mysql-connector-java-8.0.30.jar data/kscs/resources/test.py

pyspark之sparksql数据交互相关推荐

  1. Cache 与Memory架构及数据交互

    Cache 与Memory架构及数据交互

  2. java与c/c++之间的数据交互-----jni点滴

    2019独角兽企业重金招聘Python工程师标准>>> 淡泊明志.宁静致远 A Diamond is just a piece of Coal that did well under ...

  3. AngularJs $resource 高大上的数据交互

    $resource 创建一个resource对象的工厂函数,可以让你安全的和RESFUL服务端进行数据交互. 需要注入 ngResource 模块.angular-resource[.min].js ...

  4. ajax前后台交互 .net,使用ajax进行前后台的数据交互

    1 什么是ajax: ​ 异步的JavaScript和xml,跟后台交互,都用json 2 ajax干啥用的? ​ 前后端做数据交互: 3 特点: ​ -异步(异步和同步的区别:同步是请求发过去,要等 ...

  5. django前端引用数据_「基于Python技术的智慧中医商业项目」Django前端网站篇-5.资讯的数据交互...

    如果你对中医感兴趣欢迎留言讨论,觉得我的内容对你有帮助的话,能够请我喝一杯沪上阿姨不禁万分感谢. 内容简介 本章介绍Django搭建中医网站项目平台前端建设之资讯模块的数据交互的内容.其中由于代码量较 ...

  6. Android开发之Service与Activity数据交互(源代码分享)

    Service想要与Activity进行数据交互,首先Activity先得绑定Service.bound service是service 的实现,它允许其他应用程序绑定到它并与之交互.要提供bound ...

  7. Android开发之Fragment与Activity的数据交互通过回调机制实现(源代码分享)

    上一篇文章简单介绍了Android的回调机制的使用,这一篇博文将重点介绍Fragment碎片与activity的数据交互,fragment在Android开发中起着至关重要的作用,通过官方Androi ...

  8. SpringMVC入门(二)—— 参数的传递、Controller方法返回值、json数据交互、异常处理、图片上传、拦截器

    SpringMVC入门(二)-- 参数的传递.Controller方法返回值.json数据交互.异常处理.图片上传.拦截器 参考文章: (1)SpringMVC入门(二)-- 参数的传递.Contro ...

  9. 运用Smark.SocketAsyncs方便实现数据交互服务

    Smark.SocketAsyncs是通过SocketAsyncEventArgs对Socket进行包装的处理程序,暂时只封装了对Tcp的支持.以下是通过Smark.SocketAsyncs封装一个简 ...

最新文章

  1. 控制~Control System 线性系统
  2. LBS将使品牌口碑更真实
  3. python实现远程登录_python远程登录代码
  4. centos7下使用yum安装mysql并创建用户,数据库以及设置远程访问
  5. 【Oracle认证必读】常见问题解答
  6. DEDEv5.6跳转网址修改成直链地址而非动态跳转
  7. 控制台当前行显示进度条,不换行
  8. 没光驱怎么重装电脑系
  9. Linux 双网卡绑定方法
  10. NServiceBus主机
  11. springboot jar包部署_Spring Boot项目基于Jar部署和打包详解教程
  12. python语句分为复合语句_对Python中for复合语句的使用示例讲解
  13. ajax 请求struts1,jquery ajax +struts1.3
  14. NoSql数据库Redis的在ubuntu下的部署使用
  15. 搭建安卓开发环境并测试运行安卓开发环境
  16. Unix/Linux编程:Internet domain socket
  17. uniapp遇到后台返回base64码格式图片没有显示出来
  18. cubemx stm32 陶晶驰 串口屏 基于YXY通信原理的串口屏驱动代码
  19. 【招聘】上海微创医疗机器人集团 - 软件工程师/图像算法工程师
  20. Android性能优化(一):APP启动优化

热门文章

  1. 解决电脑无法进入休眠/睡眠状态,而仅关闭屏幕的问题
  2. Mybatis源码阅读(二)
  3. http://www.bubuko.com/infodetail-2274185.html
  4. 微前端在Vue项目的实践
  5. MySQL-基础练习题1
  6. fastjson解析多层对象中的属性
  7. 普通人如何在5年内赚到1000万
  8. 大唐:我家阁楼通公主府(三)
  9. MicroChip C18编译器上手及环境设置
  10. LINUX防御ssh远程暴力破解