最近有一个需求每10分钟,将统计当天每个项目下检测下载升级数据,然后将统计结果插入数据库。
假设第一次统计结果为A.txt,
10分钟后,统计结果为B.txt,
但是只有少数项目的统计改变,大部分还是原样,那没必要把所有B的统计结果更新到数据。
只更新变动那行。

去stackoverflow,找到一种方法,比较符合预期。

result = pd.merge(dataframe_b, dataframe_a, how='left', indicator=True).query("_merge=='left_only'").drop('_merge', 1)

将代码整合
sql_config.py

import pandas as pd
from sqlalchemy import create_engine
import common.path_config as config
import loggingclass SQLConfig(object):def __init__(self):#config全是数据库链接配置self.ip = config.ipself.port = config.portself.database = config.databaseself.username = config.usernameself.password = config.passwordself.line = "mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8" % (self.username, self.password, self.ip, self.port, self.database)self.engine = self.__get_engine()self.table = Noneself.dataframe = Noneself.result = Noneself.path=Noneself.logger = self.get_logger(__name__)def get_logger(self,name):logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(name)return loggerdef __get_engine(self):return create_engine(self.line)def execute(self, pt, *args, **kwargs):passdef delete_result(self, pt):self.logger.warning("delete table:" + self.table + " in day(" + pt + ") 's data!")self.engine.execute("delete from {tablename} where pt='{date}'".format(tablename=self.table, date=pt))def sub_duplicate_data(self, pt, *args, **kwargs):if (self.dataframe is None or self.dataframe.size == 0):returntry:origin_dataframe = pd.read_pickle(self.path)self.result = pd.merge(self.dataframe, origin_dataframe, how='left', indicator=True).query("_merge=='left_only'").drop('_merge', 1)except IOError:self.logger.warning("the origin %s path is not existed!" % (self.path))self.result = self.dataframedef save_result(self, *args, **kwargs):pd.io.sql.to_sql(self.dataframe, self.table, con=self.engine, if_exists="append", index=False, chunksize=10000)def run_all(self, pt, offline=True, *args, **kwargs):self.execute(pt, *args, **kwargs)if (offline == True):self.delete_result(pt, *args, **kwargs)else:self.sub_duplicate_data(pt, *args, **kwargs)self.save_result(pt, *args, **kwargs)

stats_product_delta_merge_online.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
sys.path.append('..')
from  datetime import datetime
from common.sql_config import SQLConfig
from common.path_config import base_path
import pandas as pdclass ProductDeltaMergeOnline(SQLConfig):def __init__(self):super(ProductDeltaMergeOnline, self).__init__()self.table = 'stats_product_delta_merge'self.path = base_path + 'product_delta_merge_online'self.logger = self.get_logger(__name__)def execute(self, pt):self.logger.info("test executing Delta:" + pt)# 计算 checksql_check = "SELECT product_id,delta_id,origin_version,now_version AS target_version,COUNT(DISTINCT device_id) count_check  FROM " \"iot_otalog.ota_interface_check_info_log WHERE create_time BETWEEN '{date} 00:00:00' AND '{date} 23:59:59'  GROUP BY product_id,delta_id,origin_version,target_version".format(date=pt)self.logger.info(sql_check)check_count = pd.read_sql(sql_check, con=self.engine)# 计算 downloadsql_download = "SELECT product_id,delta_id,origin_version,now_version AS target_version,COUNT(DISTINCT device_id) count_download  FROM " \"iot_otalog.ota_interface_download_info_log WHERE create_time BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' AND download_status=1 GROUP BY product_id,delta_id,origin_version,target_version".format(date=pt)self.logger.info(sql_download)download_count = pd.read_sql(sql_download, con=self.engine)sql_downfail = "SELECT product_id,delta_id,origin_version,now_version AS target_version,COUNT(DISTINCT device_id) count_downfail  FROM " \"iot_otalog.ota_interface_download_info_log WHERE create_time BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' AND download_status!=1 GROUP BY product_id,delta_id,origin_version,target_version".format(date=pt)self.logger.info(sql_downfail)downfail_count = pd.read_sql(sql_downfail, con=self.engine)# 计算 upgradesql_upgrade = "SELECT product_id,delta_id,origin_version,now_version AS target_version,COUNT(DISTINCT device_id) count_upgrade FROM " \"iot_otalog.ota_interface_upgrade_info_log WHERE create_time BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' AND update_status=1  GROUP BY product_id,delta_id,origin_version,target_version".format(date=pt)self.logger.info(sql_upgrade)upgrade_count = pd.read_sql(sql_upgrade, con=self.engine)sql_upfail = "SELECT product_id,delta_id,origin_version,now_version AS target_version,COUNT(DISTINCT device_id) count_upgradefail FROM " \"iot_otalog.ota_interface_upgrade_info_log WHERE create_time BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' AND update_status!=1  GROUP BY product_id,delta_id,origin_version,target_version".format(date=pt)self.logger.info(sql_upfail)upfail_count = pd.read_sql(sql_upfail, con=self.engine)# 结果合并成一个数据框result_merge1 = pd.merge(check_count, download_count, how='outer',on=['product_id', 'delta_id', 'origin_version', 'target_version'])result_merge2 = pd.merge(result_merge1, downfail_count, how='outer',on=['product_id', 'delta_id', 'origin_version', 'target_version'])result_merge3 = pd.merge(result_merge2, upgrade_count, how='outer',on=['product_id', 'delta_id', 'origin_version', 'target_version'])result_merge4 = pd.merge(result_merge3, upfail_count, how='outer',on=['product_id', 'delta_id', 'origin_version', 'target_version'])# 用0填补空值result_fill_na = result_merge4.fillna(value={'count_download': 0, 'count_downfail': 0, 'count_upgrade': 0, 'count_upgradefail': 0,'count_check': 0})# 增加日期字段result_fill_na['pt'] = ptself.dataframe = result_fill_nadef save_result(self, pt):if (self.result is None or self.result.size == 0):self.logger.warning(' without new data, program is over')returnfor indexs in self.result.index:sql_rep = "replace into {table_name}" \"(product_id, delta_id,origin_version, target_version,count_check,count_download,count_downfail,count_upgrade,count_upgradefail,pt)" \"values({product_id},{delta_id},'{origin_version}','{target_version}',{count_check},{count_download},{count_downfail},{count_upgrade},{count_upgradefail},'{pt}')" \.format(table_name=self.table, product_id=self.result.loc[indexs, 'product_id'],delta_id=self.result.loc[indexs, 'delta_id'],origin_version=self.result.loc[indexs, 'origin_version'],target_version=self.result.loc[indexs, 'target_version'],count_check=self.result.loc[indexs, 'count_check'],count_download=self.result.loc[indexs, 'count_download'],count_downfail=self.result.loc[indexs, 'count_downfail'],count_upgrade=self.result.loc[indexs, 'count_upgrade'],count_upgradefail=self.result.loc[indexs, 'count_upgradefail'],pt=self.result.loc[indexs, 'pt'])self.engine.execute(sql_rep)# 覆盖旧数据self.dataframe.to_pickle(self.path)if __name__ == '__main__':pt = datetime.now().strftime('%Y-%m-%d')print('execute today is :' + pt)d = ProductDeltaMergeOnline()d.run_all(pt, offline=False)

大体流程是:
统计各个项目下check,down,upgrade,将统计结果剔除上一次统计结果。
更新到数据库stats_product_delta_merge 中。
表结构

CREATE TABLE `stats_product_delta_merge` (`id` bigint(30) unsigned NOT NULL AUTO_INCREMENT,`product_id` bigint(30) NOT NULL COMMENT '项目id',`delta_id` bigint(30) NOT NULL COMMENT '差分关系id',`origin_version` varchar(100) NOT NULL DEFAULT '' COMMENT '源版本',`target_version` varchar(100) NOT NULL DEFAULT '' COMMENT '目标版本',`count_check` int(20) NOT NULL DEFAULT 0 COMMENT '检测成功数',`count_download` int(20) NOT NULL DEFAULT 0 COMMENT '下载成功数',`count_upgrade` int(20) NOT NULL DEFAULT 0 COMMENT '升级成功数',`count_downfail` int(20) NOT NULL DEFAULT 0 COMMENT '下载失败数',`count_upgradefail` int(20) NOT NULL DEFAULT 0 COMMENT '升级失败数',`pt` date NOT NULL COMMENT '数据上报的日期(年月日)',`update_time` datetime NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '插入时间',`create_time` datetime NOT NULL DEFAULT current_timestamp() COMMENT '创建时间',PRIMARY KEY (`id`),UNIQUE KEY `product_delta_pt` (`product_id`,`delta_id`,`pt`,`origin_version`,`target_version`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

需要建唯一索引product_id,delta_id,pt,origin_version,target_version字段唯一索引。

pandas 从dataframe A剔除 dataframe B包含的行相关推荐

  1. pandas基于dataframe字符串数据列不包含特定字符串来筛选dataframe中的数据行(rows where values do not contain substring)

    pandas基于dataframe字符串数据列不包含(not contains)特定字符串来筛选dataframe中的数据行(rows where values do not contain subs ...

  2. pandas使用np.where函数计算返回dataframe中指定数据列包含缺失值的行索引列表list

    pandas使用np.where函数计算返回dataframe中指定数据列包含缺失值的行索引列表list(index of rows with missing values in dataframe ...

  3. pandas使用query函数查询dataframe指定数据列的内容(数值)不包含在特定列表中的数据行(not contain in list)

    pandas使用query函数查询dataframe指定数据列的内容(数值)不包含在特定列表中的数据行(select rows which column values are not containe ...

  4. pandas使用dropna函数删除dataframe中所有包含缺失值的数据行(drop rows which contain missing vlaues in dataframe)

    pandas使用dropna函数删除dataframe中所有包含缺失值的数据行(drop rows which contain missing vlaues in dataframe) 目录

  5. pandas使用query函数查询dataframe指定数据列的内容(数值)包含在特定列表中的数据行(select rows which column values contain in list)

    pandas使用query函数查询dataframe指定数据列的内容(数值)包含在特定列表中的数据行(select rows which column values contained in a li ...

  6. pandas使用isin函数和all函数判断dataframe特定数列中是否包含指定列表中的全部内容

    pandas使用isin函数和all函数判断dataframe特定数列中是否包含指定列表中的全部内容(checking if the dataframe column contains all val ...

  7. pandas使用datetime创建示例dataframe:包含完整的时分秒信息、创建时间索引、使用不同的时间频率、date_range中参数freq的偏移量别名

    pandas使用datetime创建示例dataframe:包含完整的时分秒信息.创建时间索引.使用不同的时间频率.date_range中参数freq的偏移量别名 目录

  8. 成功解决除去或展开pandas.core.frame.DataFrame输出类型中所包含的省略号(列数据或者行数据显示不完全)

    成功解决除去或展开pandas.core.frame.DataFrame输出类型中所包含的省略号(列数据或者行数据显示不完全) 目录 解决问题 解决思路 解决方法 解决问题 pandas.core.f ...

  9. pandas教程:series和dataframe

    起步 pandas是一种Python数据分析的利器,是一个开源的数据分析包,最初是应用于金融数据分析工具而开发出来的,因此pandas为时间序列分析提供了很好的支持.pandas是PyData项目的一 ...

最新文章

  1. 关于ceph源码 backtrace 打印函数调用栈
  2. MongoDB数据库(一:基本操作)
  3. 你自己不优秀,认识谁都是个屁
  4. 网络安全系列之二十四 XSS跨站脚本攻击2
  5. linux笔记第一章--基本命令记载
  6. WINCE基于CH7024实现TV OUT (VGA)功能
  7. 金融风控实战——Hive详解(数据读取、预处理、特征工程)
  8. Django Rest Framework源码剖析(二)-----权限
  9. 2017. 网格游戏
  10. 【跟着我们学Golang】基础结构
  11. mysql支持ip访问
  12. 电子测量——用C语言设计测量数据误差处理的通用程序
  13. iOS--HealthKit简单使用
  14. Nebula Graph - 全文索引
  15. python mqtt publish_mqtt异步publish方法
  16. chapter html文件,chapter27_HTML解析(pyquery)
  17. python求数组平均值numpy_计算numpy数组的平均值
  18. 【BDTC 2016】蚂蚁金服人工智能部技术总监李小龙:人工智能驱动金融生活
  19. 国开教育学形考任务2试题1试题及答案
  20. shell 生成指定范围随机数与随机字符串

热门文章

  1. Android自定义快速设置
  2. 【转载】白话谈anchor(锚点)
  3. oracle的前端是什么,Oracle的那些事情
  4. vb的while和do循环
  5. 使用TXT导出文件夹下所有文件名!
  6. 喝水越多瘦得越快?-987减肥
  7. 电脑可以任意正常上网,然后右下角的小电脑是地球的形状
  8. 20060217-All about pixel colors: Window-level and CLim
  9. python捕获print标准输出
  10. Linux操作系统(一)系统初始化