• 多进程学习【python 多进程传参】pool.map() 函数传多参数

multiprocessing模块,同时提供本地和远程并发,使用子进程代替线程,有效避免Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Linux 和 Windows 上都可以运行。

HDFSAppid.py

# -*- coding: utf-8 -*-
import os
import datetime
import sys
import sys, getopt"""今天"""
date = (datetime.datetime.now()-datetime.timedelta(1)).strftime("%Y-%m-%d")
app_ids = []"""判断hadoop里当天有哪些APP_id,组成list"""
def summary_app():f = os.popen('/data/hadoop-2.9.0/bin/hdfs dfs -ls hdfs://ip地址:端口/ctrp/click/{}'.format(date))file_list = f.readlines()for file in file_list[1:]:app_ids.append(file.strip().split(' ')[-1].split('/')[-1])return app_ids

fengff_click.py,对上述app_ids的list,分别执行每个app_id,对点击率建模

# -*- coding: utf-8 -*-
import os
import datetime
import sys
import sys, getopt
import multiprocessing
from HDFSAppid import summary_app
import time"""今天"""
date = datetime.datetime.now().strftime("%Y-%m-%d")
last_date = (datetime.datetime.now()-datetime.timedelta(3)).strftime('%Y-%m-%d')
lib_file = 'orign.libffm'
libffm_file = 'click.libffm'
libffm_train = 'click_train.libffm'
libffm_eval = 'click_eval.libffm'
libffm_test = 'click_test.libffm'
"""判断hadoop里是否有目标数据,有:下载到服务器上;无:邮件提醒"""def run_model(app_id):t1 = time.time()# 判断路径下是否有相关文件,没有的话,新建,否则,不新建def run_url():if os.path.exists('/data/marq/fengff/{}'.format(str(abs(int(app_id))))) == False:dir = os.chdir('/data/marq/fengff/')os.system('mkdir {}'.format(str(abs(int(app_id)))))if os.path.exists('/data/marq/fengff/{}/click_fm'.format(str(abs(int(app_id))))) == False:v_dir = os.chdir('/data/marq/fengff/{}/'.format(str(abs(int(app_id)))))os.system('mkdir click_fm')val_dir = os.chdir('/data/marq/fengff/{}/click_fm/'.format(str(abs(int(app_id)))))  # 把当前工作目录切换到dirname下os.system('mkdir {}'.format(date))if os.path.exists('/data/marq/fengff/{}/click_fm/{}'.format(str(abs(int(app_id))),last_date)) == True:os.system('rm -rf {}'.format(last_date))data_dir = os.chdir('/data/marq/fengff/{}/click_fm/{}'.format(str(abs(int(app_id))),date))  # 把当前工作目录切换到dirname下os.system('mkdir model')run_url()# 对点击率执行的过程及结果输出到相应的文件里,方便后续对每个APP_id根据结果,进行上线。with open ('click_{}.log'.format(app_id),'w+') as ff:click_dir = os.getcwd()  # 当前数据的工作目录路径ff.write("当前数据的工作目录路径:"+str(click_dir)+'\n')f = os.popen('/data/hadoop-2.9.0/bin/hdfs dfs -ls hdfs://10.80.1.161:8020/ctrp/ifs-1-5-0/sample/ffm/click/{}/{}/'.format(date,str(app_id)))file_list = f.readlines()target_file = file_list[1].strip().split(' ')[-1]if "_SUCCESS" in target_file:os.system('/data/hadoop-2.9.0/bin/hdfs dfs -cat hdfs://10.80.1.161:8020/ctrp/ifs-1-5-0/sample/ffm/click/{}/{}/part-* > {}'.format(date,str(app_id),os.path.join(click_dir, lib_file)))os.system('/data/hadoop-2.9.0/bin/hdfs dfs -cat hdfs://10.80.1.161:8020/ctrp/ifs-1-5-0/sample/ffm/click/{}/{}_schema.json > {}/click_schema.json'.format(date,str(app_id),os.path.join(click_dir,'model')))f = os.popen('wc -l {}'.format(lib_file))lines = f.readlines()ff.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+'\n')ff.write("点击率全样本共有{}行".format(str(lines))+'\n')os.system('shuf {} -o {}'.format(lib_file, libffm_file))a = ''.join(lines)a = int(a.split(' ')[0])b = int(a // 5)ff.write("每等份有{}行".format(str(b)) + '\n')os.system('split -{} {}'.format(b, libffm_file))os.system('cat xaa > {}'.format(libffm_eval))os.system('cat xab > {}'.format(libffm_test))os.system('cat xac xad xae xaf > {}'.format(libffm_train))os.system('rm -f xaa xab xac xad xae xaf')libff_dir = os.chdir("/data/fengsj/bin/libff")  # 把当前工作目录切换到模型路径下os.system('./ffm-train -l 0.00001 -k 8 -t 50 -r 0.05 -c 0.05 -s 24 -autostop 4 -p {} {} {}/clkoutput.txt'.format(os.path.join(click_dir, libffm_eval), os.path.join(click_dir, libffm_train),os.path.join(click_dir, 'model')))f = os.popen('./ffm-predict {} {}/clkoutput.txt {}/predict_output.txt {} 0.05'.format(os.path.join(click_dir, libffm_test), os.path.join(click_dir, 'model'),os.path.join(click_dir, 'model'), str(b)))t2 = time.time()ff.write("本APP_id执行时间为:"+str(int(t2-t1)) + 's' +'\n')ff.write(str(f.readlines()))f.close()return Trueelse:ff.close()return Falsedef main():app_ids = summary_app()print("app_ids:",app_ids)t1 = time.time()pool = multiprocessing.Pool(processes=20)  #创建拥有20个进程数量的进程池pool.map(run_model,app_ids)pool.close()  #关闭进程池,不再接受新的进程pool.join()  #主进程阻塞等待子进程的退出t2 = time.time()print('并行执行时间:{}s'.format(str(int(t2-t1))))if __name__ == '__main__':main()  # 0:运行的文件名,1往后是输入的参数   命令行里示例:Python fengff_imp.py -i inputfile -o outputfile

submitmodel.py 对上述APP各自建模情况,取各自最后的效果数据
,这里需要注意自定义变量

import sys
from HDFSAppid import summary_appapp_ids = summary_app()
for app_id in app_ids:exec('clkmap{} = {}'.format(abs(int(app_id)), {}))   # 自定义变量exec('impmap{} = {}'.format(abs(int(app_id)), {}))for app_id in app_ids:with open('{}/{}/click_fm/{}/click_{}.log'.format(ddir,str(abs(int(app_id))),date,str(app_id))) as f:cli_fm = f.readlines()[-1]cli_list = cli_fm.split('[')[-1].split(',')[0:-1]  # 去掉imp log日志的最后一行的最后一句话“wantlsd from heart!\n”for l in cli_list:target = l.split(' = ')[0].split("'")[1]if target == 'oe':value = l.split(' = ')[1:][0].split(' ')[0]else:value = l.split(' = ')[-1]eval('clkmap'+str(abs(int(app_id))))[target] = valuef.close()with open('{}/{}/req-imp/{}/imp_{}.log'.format(ddir,str(abs(int(app_id))),date,str(app_id))) as f:cli_fm = f.readlines()[-1]cli_list = cli_fm.split('[')[-1].split(',')[0:-1]  # 去掉imp log日志的最后一行的最后一句话“wantlsd from heart!\n”for l in cli_list:target = l.split(' = ')[0].split("'")[1]if target == 'oe':value = l.split(' = ')[1:][0].split(' ')[0]else:value = l.split(' = ')[-1]eval('impmap'+str(abs(int(app_id))))[target] = valuef.close()print('app_id为{}的点击率指标为:{}'.format(app_id, eval('clkmap' + str(abs(int(app_id))))))print('app_id为{}的曝光率指标为:{}'.format(app_id, eval('impmap' + str(abs(int(app_id))))))

python 对各app用多进程方式自动化建模并推送线上相关推荐

  1. Docker之使用maven插件【Dockerfile方式】构建并推送镜像到私有仓库

    准备工作 操作系统版本 [root@node02 ~]# cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) docker版本 [ ...

  2. 用python控制钉钉软件_Python实现钉钉消息推送

    钉钉是一个功能非常完善的办公软件,对于数据来说能不能也可以很好的利用钉钉这个资源呢?答案是肯定的.钉钉机器人是钉钉群的一个高级扩展功能,而且有很多种类型,如下: 对于我们日常数据消息推送的话则选择自定 ...

  3. 基于python命令流及代码的Plaxis自动化建模

    有限单元法在岩土工程问题中应用非常广泛,很多商业软件如Plaxis/Abaqus/Comsol等都采用有限单元解法.在使用各大软件进行数值模拟建模的过程中,您是否发现GUI界面中重复性的点击输入工作太 ...

  4. 【Python基础】Github标星4.7k,每天推送一个python小实例的Python库

    文章来源于Python与算法社区,作者zhenguo 推荐一个 python-small-examples 库,每天推送一个Python实例.时至今日,共有4700人 star 期间多次登上githu ...

  5. python获取钉钉日志数据_python3实现zabbix告警推送钉钉的示例

    自己写了一个简单的python脚本,用来推送zabbix告警到钉钉机器人,推送格式为markdown,有需要的可以自己修改markdown的格式及推送的值(zabbix宏) 环境如下,理论上zabbi ...

  6. android 杀死程序收不到推送_Android APP切换到后台接收不到推送消息

    1.   Android端进程被杀死后,目前自带的保护后台接收消息活跃机制.暂时没有什么好的机制保持任何情况下都活跃 android原生系统用home键杀进程可以起来,如果是强行停止就只能用户自己手动 ...

  7. 多个不同的app应用间应该如何进行消息推送呢?

    现在很多公司做app应用都会用到推送,推送这个不多说了,怎么做网上一堆,用的比较多的还数极光推送(Jpush)以及百度推送,目前我们使用Jpush,文档方面质量是差了点..这个先不吐槽,主要现在的ap ...

  8. python发钉钉消息_Python调用钉钉机器人推送消息

    1.创建钉钉机器人 群机器人是钉钉群的高级扩展功能,群机器人可以将第三方服务的信息聚合到群聊中,实现自动化的信息同步: 进入一个钉钉群,点击右上角的"群设置"进入群设置页面: 选择 ...

  9. android实现推送方式解决方案,Android实现推送方式解决方案系列教程

    Android实现推送方式解决方案系列教程 1-5 Android实现推送方式解决方案系列教程 总结者:难民 交流Q群: 137824028 Android实现推送方式解决方案系列之一--XMPP协议 ...

最新文章

  1. linux下使用source /etc/profile保存配置后,新的环境变量只能在一个终端里面有效
  2. nginx负载均衡和lvs负载均衡的比较分析
  3. jQuery--.css(width)和.width()的区别
  4. mac电脑开机键盘和鼠标失灵
  5. CentOS7搭建hadoop2.6.4+HBase1.1.6
  6. bzoj4013: [HNOI2015]实验比较
  7. 【转贴】爱是莲花,最美的是清苦的莲心
  8. 本机连接opc server有部分数据不刷新_实时数据库PI在企业MES系统中的应用
  9. 给大家送一个机械轴键盘~
  10. js获取文件的后缀名
  11. 数据挖掘-Apriori算法
  12. 空气螺旋桨,让水下动物不再受伤害
  13. 微信为什么不能下载apk以及微信下载APK的解决办法
  14. Flutter修改App名称(Android+IOS)
  15. 如何免费复制网页内容
  16. 采购者具体负责的问题
  17. 快递分拣程序 python_顺丰快递分拣程序
  18. ARM公司为何如此成功
  19. 计算两个字符串之间的相似度
  20. Working Practice-设置免打扰时间

热门文章

  1. QQ启动时:Initialization failure:0x0000000C 解决方案
  2. 什么是 web 开发
  3. 【前端开发API】豆瓣开放API
  4. Clion和STM32CubeMx新建项目
  5. 小泼猴案例页面的动态渲染
  6. 给自己看:Java路上,一个萝卜一个坑
  7. obs点歌插件 html效果,OBS歌曲显示插件使用图文教程
  8. 手写Web服务器(三)
  9. 大端和小端的含义及判断代码
  10. 快速集成融云SDK– Android Studio