进程池与线程池出发点一样,都是考虑多核情况下任务的并行处理;
从多进程和多线程编程的区别上看,多线程有许多的同步、互斥的方法,较擅长于异步协作;而多进程同步、互斥的方法相对比较麻烦,则更多地考虑上下文独立执行;
从Nginx使用线程池/进程池处理大并发的思路去分析,其实就是多客户端大量连接的场景;主进程监听是否有新客户端tcp连接,然后分发给工作进程去响应http请求,在这种场景下每个连接都是一个独立的上下文逻辑,每个工作进程的内容都是对等地处理http请求,这种情况就非常适合进程池的方式;

把上述的场景给抽象出来,关注于父子进程的通讯,也就是下图流程:

Master主进程给Worker子进程分发任务,通讯的方式用的就是单向管道的方式;

分发任务这块,可以有随机分发、轮流分发、计分板等多种方法,具体可以结合业务进行设计;

本节根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;

在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;

/* Process struct */
typedef struct process
{char name[SIZE_NAME_NORMAL];                    /* Process name */pid_t pid;                                      /* Process id */int pipefd[2];                                  /* Connection between master/slave */size_t score;                                   /* Score record */
} process_t;/* Program instance */
typedef struct instance
{char prg_name[SIZE_NAME_LONG];                  /* Program name with path */char cfg_name[SIZE_NAME_LONG];                  /* Configure name with path */u16 process_num;                                /* Sub process number */u16 process_idx;                                /* Current process index */struct process *proc;                           /* Process struct */
} instance_t;

进程池的初始化函数,根据给定的Worker进程数开启子进程;

int process_pool(instance_t *pinst, u16 process_num)
{int ret = FAILURE;int ix  = 0;int status = 0;if ( !pinst || !process_num ) {printf("NULL\n");goto _E1;}signal(SIGINT,  __sig_quit);signal(SIGTERM, __sig_quit);pinst->process_idx = 0;pinst->process_num = process_num;pinst->proc = (process_t *)calloc(process_num + 1, sizeof(process_t));if ( !pinst->proc ) {printf("Alloc process pool struct failed\n");goto _E1;}for ( ix = 1; ix <= process_num; ix++ ) {int bufsize = 1;ret = pipe(pinst->proc[ix].pipefd);if ( SUCCESS != ret ) {printf("socketpair\n");goto _E2;}printf("Setup worker#%u\n", ix);pinst->proc[ix].pid = fork();if ( pinst->proc[ix].pid < 0 ) {printf("fork\n");goto _E2;}else if ( pinst->proc[ix].pid > 0 ) {/* Father */CLOSE_FD(pinst->proc[ix].pipefd[0]);continue;}else {/* Child */CLOSE_FD(pinst->proc[ix].pipefd[1]);pinst->process_idx = ix;ret = __worker(pinst);goto _E2;}}ret = __master(pinst);/* Waiting workers */for ( ix = 1; ix <= pinst->process_num; ix++ ) {waitpid(pinst->proc[ix].pid, &status, WNOHANG);}_E2:for ( ix = 1; ix <= pinst->process_num; ix++ ) {CLOSE_FD(pinst->proc[ix].pipefd[1]);CLOSE_FD(pinst->proc[ix].pipefd[0]);}FREE_POINTER(pinst->proc);
_E1:return ret;
}

然后就是Master Worker的实现,Master就是简单地进行一个事件分发,通过约定的协议"ABC"代表三种工作命令,“Q“代表退出进程;

static int __master(instance_t *pinst)
{int ret  = 0;int fd   = 0;int ix   = 0;int roll = 0;char c = 0;#define __offset(pinst) ((pinst)->proc[(pinst)->process_idx])
#define __round_robin(pinst, roll) \((pinst)->proc[((roll) % (pinst)->process_num) + 1].pipefd[1])printf("Master#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {/* Get pipe fd by round-robin */fd = __round_robin(pinst, ++roll);c = 'A' + roll % 3; // 'A'/'B'/'C'ret = write(fd, &c, 1);if ( ret <= 0 ) {return FAILURE;}sleep(1);}/* Tell all workers to quit */for ( ix = 1; ix <= pinst->process_num; ix++ ) {c = 'Q';write(__round_robin(pinst, ++roll), &c, 1);}printf("Master#%u shutdown\n", pinst->process_idx);return SUCCESS;
}
static int __worker(instance_t *pinst)
{int fd = __offset(pinst).pipefd[0];int ix = 0;ssize_t read_byte = FAILURE;char buffer[1024] = {0};printf("Worker#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {read_byte = read(fd, buffer, sizeof(buffer));if ( read_byte <= 0 ) {if ( errno == EAGAIN || errno == EINTR ) {continue;}return FAILURE;}for ( ix = 0; ix < read_byte; ix++ ) {switch ( buffer[ix] ) {case 'A':case 'B':case 'C':__offset(pinst).score += buffer[ix];printf("Worker#%u Recv command: %c, score: %llu\n",pinst->process_idx,buffer[ix], __offset(pinst).score);break;case 'Q':printf("Quit\n");g_enable = 0;break;default:break;}}}printf("Worker#%u shutdown\n", pinst->process_idx);return SUCCESS;
}

最后带一个main方法,考虑了进程释放的问题,在这个demo中使用信号对genable进行控制;

static u8 g_enable;                         /* Running flag */
static void __sig_quit(int sig)
{g_enable = 0;
}int main(int argc, char *argv[])
{instance_t inst = {0};if ( argc < 2 ) {printf("Usage: \n\t%s < process number >\n", argv[0]);return EXIT_FAILURE;}return process_pool(&inst, atoi(argv[1]));
}

完整代码如下:

/****根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;*在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;**/#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>#define FAILURE -1
#define u8  unsigned char#define u16 unsigned  short#define SIZE_NAME_NORMAL 48#define SIZE_NAME_LONG 48#define SUCCESS 0#define CLOSE_FD(x) (close(x))#define FREE_POINTER(x) (free(x))
#define __offset(pinst) ((pinst)->proc[(pinst)->process_idx])/* 写管道*/
#define __round_robin(pinst, roll) \((pinst)->proc[((roll) % (pinst)->process_num) + 1].pipefd[1])
/****本节根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;*在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;**//* Process struct */
typedef struct process
{char name[SIZE_NAME_NORMAL];                    /* Process name */pid_t pid;                                      /* Process id */int pipefd[2];                                  /* Connection between master/slave */size_t score;                                   /* Score record */
} process_t;/* Program instance */
typedef struct instance
{char prg_name[SIZE_NAME_LONG];                  /* Program name with path */char cfg_name[SIZE_NAME_LONG];                  /* Configure name with path */u16 process_num;                                /* Sub process number */u16 process_idx;                                /* Current process index */struct process *proc;                           /* Process struct */} instance_t;static u8 g_enable;                         /* Running flag */static void __sig_quit(int sig)
{g_enable = 0;
}static int __master(instance_t *pinst)
{int ret  = 0;int fd   = 0;int ix   = 0;int roll = 0;char c = 0;printf("Master#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {/* Get pipe fd by round-robin */fd = __round_robin(pinst, ++roll);c = 'A' + roll % 3; // 'A'/'B'/'C'ret = write(fd, &c, 1);if ( ret <= 0 ) {return FAILURE;}sleep(1);}/* Tell all workers to quit */for ( ix = 1; ix <= pinst->process_num; ix++ ) {c = 'Q';write(__round_robin(pinst, ++roll), &c, 1);}printf("Master#%u shutdown\n", pinst->process_idx);return SUCCESS;
}static int __worker(instance_t *pinst)
{int fd = __offset(pinst).pipefd[0];int ix = 0;ssize_t read_byte = FAILURE;char buffer[1024] = {0};printf("Worker#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {printf("work read front......\n");read_byte = read(fd, buffer, sizeof(buffer)); /* 读管道会阻塞*/printf("work read after......\n");if ( read_byte <= 0 ) {if ( errno == EAGAIN || errno == EINTR ) {continue;}return FAILURE;}for ( ix = 0; ix < read_byte; ix++ ) {switch ( buffer[ix] ) {case 'A':case 'B':case 'C':__offset(pinst).score += buffer[ix];printf("Worker#%u Recv command: %c, score: %llu\n",pinst->process_idx,buffer[ix], __offset(pinst).score);break;case 'Q':printf("Quit\n");g_enable = 0;break;default:break;}}}printf("Worker#%u shutdown\n", pinst->process_idx);return SUCCESS;
}int process_pool(instance_t *pinst, u16 process_num)
{int ret = FAILURE;int ix  = 0;int status = 0;if ( !pinst || !process_num ) {printf("NULL\n");goto _E1;}/*** SIGINT ctrl+c 终止进程*/signal(SIGINT,  __sig_quit);/** SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和*处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这*个信号. */signal(SIGTERM, __sig_quit);pinst->process_idx = 0;pinst->process_num = process_num;pinst->proc = (process_t *)calloc(process_num + 1, sizeof(process_t));if ( !pinst->proc ) {printf("Alloc process pool struct failed\n");goto _E1;}for ( ix = 1; ix <= process_num; ix++ ) {int bufsize = 1;/*fd[0]:读管道,fd[1]:写管道*/ret = pipe(pinst->proc[ix].pipefd);if ( SUCCESS != ret ) {printf("socketpair\n");goto _E2;}printf("Setup worker#%u\n", ix);pinst->proc[ix].pid = fork();if ( pinst->proc[ix].pid < 0 ) {printf("fork\n");goto _E2;}else if ( pinst->proc[ix].pid > 0 ) {/* Father process close read fd pipe */CLOSE_FD(pinst->proc[ix].pipefd[0]);printf(".......\n");continue;}else {/* Child process close write fd pipe */CLOSE_FD(pinst->proc[ix].pipefd[1]);printf("xxxxxxxx\n");pinst->process_idx = ix;ret = __worker(pinst);goto _E2;}}ret = __master(pinst);/* Waiting workers */for ( ix = 1; ix <= pinst->process_num; ix++ ) {waitpid(pinst->proc[ix].pid, &status, WNOHANG);}_E2:for ( ix = 1; ix <= pinst->process_num; ix++ ) {CLOSE_FD(pinst->proc[ix].pipefd[1]);CLOSE_FD(pinst->proc[ix].pipefd[0]);}FREE_POINTER(pinst->proc);_E1:return ret;
}int main(int argc, char *argv[])
{instance_t inst = {0};  /*实例结构体*/if ( argc < 2 ) {printf("Usage: \n\t%s < process number >\n", argv[0]);return EXIT_FAILURE;}return process_pool(&inst, atoi(argv[1]));
}

c语言进程池原理及实现相关推荐

  1. Linux——匿名管道、命名管道及进程池概念和实现原理

    目录 一.什么是匿名管道 二.如何使用匿名管道 (一).pipe原理 (二).pipe使用 三.命名管道概念及区别 (一).什么是命名管道 (二).与匿名管道的联系和区别 四.命名管道的使用 (一). ...

  2. Linux高性能服务器编程:进程池和线程池原理及应用(有图有代码有真相!!!)

    一.问题引入 在前面编写多进程.多线程服务器时通过动态创建子进程和子线程来实现并发服务器,这样做有以下缺点: 1)动态创建进程.线程将会比较耗费时间,将导致较慢的客户响应. 2)动态创建的子进程只为一 ...

  3. python进程池的实现原理_Python基于进程池实现多进程过程解析

    1.注意:pool必须在 if __name__ == '__main__' 下面运行,不然会报错 2.多进程内出现错误会直接跳过该进程,并且默认不会打印错误信息 3.if__name__下面的数据需 ...

  4. Python实验项目1例:使用进程池统计指定范围内素数的个数

    本周赠书活动:董付国老师Python系列教材赠书活动(40本) -------------------------------- 适用专业: 适用于计算机.网络工程.软件工程等相关专业,其他专业选做. ...

  5. java并发包线程池原理分析锁的深度化

    java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素 ...

  6. 【5】线程池原理分析

    目录 知识点1:并发包 1.(计数器)CountDownLatch 2.(屏障)CyclicBarrier 3.(计数信号量)Semaphore (1)案例 4.并发队列 5.阻塞队列与非阻塞队 (1 ...

  7. Python线程池、进程池的介绍与使用

    文章目录 1.问题背景 2.单线程→\rightarrow→多线程→\rightarrow→线程池 2.1单线程简介 2.2多线程简介 2.3线程池介绍 2.3.1复用线程 2.3.2线程池的使用 3 ...

  8. java架构学习——5. 线程池原理剖析锁的深度化

    本篇博文主要包含: 线程池的基本概念 ThreadPoolExecutor 线程池四种创建方式 -newCachedThreadPool:可缓存线程池 -newFixedThreadPool:定长线程 ...

  9. Golang 侧数据库连接池原理和参数调优

    Golang 侧数据库连接池原理和参数调优 文章目录 Golang 侧数据库连接池原理和参数调优 数据库连接池 数据库连接池的设计 Go 的数据库连接池 Go 数据库连接池的设计 建立连接 释放连接 ...

  10. python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现

    概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...

最新文章

  1. go mod tidy 下载依赖包问题
  2. mysql复制架构迁移到pxc_mysql复制(高可用架构方案的基础)
  3. java中main方法前的public static void及其后面的(String[] args)【笔记自用】
  4. 2019-06-12 学习日记 day33 JDBC
  5. 远程服务器部署spring boot 项目(centos7为例)
  6. 求完全二叉树结点个数(leetcode 222)
  7. c++图书管理系统_我用Python帮学校写了一款图书管理系统!教导员居然请我吃饭
  8. 133个Java面试问题列表
  9. Lenovo Server Guide Install Windows Server 2008 R2
  10. 理一理IP子网划分和VLSM
  11. 自建 IPA 分发平台
  12. Python技术练习------自动化处理费用表
  13. 微信 百度云 服务器繁忙,百度网盘搜索功能失效提示操作过于频繁如何解决
  14. FIBOS社区发起人 响马:一个“极客硬核老炮儿”是怎样的?
  15. 腾讯产品策划运营类职位笔试题和参考答案
  16. 拼图android系统程序,2021手机照片拼图软件
  17. java大文件pdf水印_java – 如何扩展PDF的页面大小以添加水印?
  18. 手游创业是新的风口吗
  19. IT人才外包的合作流程是怎样的?
  20. 3G终端变局:安卓崛起 联通高调摆脱苹果

热门文章

  1. RDV需要什么服务器系统,锐起RDV的教程
  2. 如何结束vbs的代码
  3. 老罗Android开发视频教程 (android常用布局介绍)5集集合
  4. P2P协议:我下小电影,99%急死你
  5. SmartQ 智器—公司介绍
  6. android模拟器命令行,夜神安卓模拟器命令行整理贴
  7. AIDE手机编程初级教程(零基础向) 1.1 认识我的第一个应用
  8. 黑群晖nas安装保姆级教程
  9. 怎么让Firefox,chrome 等浏览器识别维语,哈语等字体
  10. 云队友丨抖音之后,互联网失去创造力