UDP是无状态的,无法用TCP一样的并发服务器。我们可以用消息队列的方式模拟下。

首先,我们看消息队列节点

typedef struct msg_buf
{int sockfd;struct sockaddr_in their_addr;  /* 连接对方的地址信息 */int sin_size;char buf[BUFF_SIZE];    size_t len;struct msg_buf *next;
}msgbuf_t;

关于分配与释放的接口,比较习惯这样的方式了

msgbuf_t *get_msgbuf()
{return (msgbuf_t *)malloc(sizeof(struct msg_buf));
}
void put_msgbuf(msgbuf_t *msg)
{free(msg);
}

服务线程相关,只有一个线程

struct msg_buf *msg_mbox;
sem_t recv_sem;
pthread_mutex_t recv_mutex;
pthread_t recv_pid;

上述参数的初始化

void server_init()
{sem_init(&recv_sem,0,0);pthread_mutex_init(&recv_mutex,NULL);pthread_mutex_init(&send_mutex,NULL);pthread_create(&recv_pid,NULL,recv_thread,NULL);}

还有个void server_destroy()处理方法。

通过atexit在main中注册。

消息队列发送

void msg_post(struct msg_buf *msg)
{pthread_mutex_lock(&recv_mutex);if(msg_mbox){//链表插入操作msg->next = msg_mbox->next;msg_mbox->next = msg;}else{msg_mbox = msg;msg->next = NULL;}pthread_mutex_unlock(&recv_mutex);sem_post(&recv_sem);
}

最后调用

sem_post通知阻塞线程处理。

处理接收数据包线程——线程只是是实现了echo功能。

View Code

void *recv_thread(void *arg)
{static int cnt = 0;while(1){sem_wait(&recv_sem); pthread_mutex_lock(&recv_mutex);struct msg_buf *msg = msg_mbox;msg_mbox = msg->next;pthread_mutex_unlock(&recv_mutex);//处理msg消息int retval = sendto(msg->sockfd, msg->buf,msg->len, 0,(struct sockaddr *)&(msg->their_addr), msg->sin_size);printf("Received %d from %s\n%s\n",++cnt,inet_ntoa((msg->their_addr).sin_addr),msg->buf);put_msgbuf(msg);}
}

main中循环

View Code

int main(int argc,char *argv[])
{int sockfd;                     /* 数据端口 */struct sockaddr_in my_addr;     /* 自身的地址信息 */int  retval;if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {perror("socket");exit(1);}/* 设置地址可复用 */int option = 1;setsockopt( sockfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option) );my_addr.sin_family = AF_INET;my_addr.sin_port = htons(MYPORT); /* 网络字节顺序 */my_addr.sin_addr.s_addr = INADDR_ANY; /* 自动填本机IP */bzero(&(my_addr.sin_zero), 8); /* 其余部分置0 */if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {perror("bind");exit(1);}/*** 其它初始化 * */server_init();/* 主循环 */while(1) {struct msg_buf     *recvmsg = get_msgbuf();size_t len = sizeof(recvmsg->buf);char *buf  = recvmsg->buf;struct sockaddr_in their_addr; int sin_size;memset(buf,len,0);retval = recvfrom(sockfd, buf, len, 0,(struct sockaddr *)&their_addr, &sin_size);printf("%s\t%s\n",inet_ntoa(their_addr.sin_addr),buf);if (retval == 0) {perror ("recvfrom");close(sockfd);break;}////封装消息recvmsg->their_addr = their_addr;recvmsg->sin_size = sin_size;recvmsg->sockfd = sockfd;recvmsg->len = retval;//发送给某消息处理
        msg_post(recvmsg);}return 0;
}

Over!

转载于:https://www.cnblogs.com/westfly/archive/2012/04/13/2446461.html

基于消息队列的UDP并发服务器v1相关推荐

  1. java抢单功能_基于消息队列的高并发抢单功能实现方法与流程

    本发明涉及嵌入式软件中间件,具体涉及一种基于消息队列的高并发抢单功能实现方法. 背景技术: 中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络 ...

  2. C10k破局(一)——线程池和消息队列实现高并发服务器

    一.C10k的由来 互联网的基础就是网络通信,早期的互联网可以说是一个小群体的集合.互联网还不够普及,用户也不多,一台服务器同时在线100个用户估计在当时已经算是大型应用了,所以并不存在什么 C10K ...

  3. activiti异步执行_对基于消息队列的Activiti异步执行器进行基准测试

    activiti异步执行 一点历史 永无休止的一件事是,Activiti在某些非常大的规模的大型组织中的使用方式. 过去,这导致了各种优化和重构,其中包括异步执行器-替换旧的作业执行器. 对于未启动的 ...

  4. 对基于消息队列的Activiti异步执行器进行基准测试

    一点历史 永不停息​​的一件事是,Activiti如何以惊人的规模在一些大型组织中使用. 过去,这导致了各种优化和重构,其中包括异步执行器-替换旧的作业执行器. 对于未启动的用户:这些执行器在流程实例 ...

  5. 消息队列,高并发的救火员

    文章目录 一.前言 二.消息队列三功能 2.1 异步通信 2.2 服务解耦 2.3 流量控制 三.消息队列的两种模型 3.1 队列模型 3.2 发布/订阅模型 3.3 小结 四.消息队列的三个独有设置 ...

  6. 基于消息队列的分布式事务解决方案

    前两天发了工资,第一反应是想着要给远方的女朋友一点惊喜!于是打开了平安银行的APP给女朋友转点钱!填写上对方招商银行卡的卡号.开户名,一键转账!搞定!在我点击的那瞬间,就收到了app的账户变动的提醒, ...

  7. 基于消息队列 RocketMQ 的大型分布式应用上云较佳实践

    作者|绍舒 审核&校对:岁月.佳佳 编辑&排版:雯燕 前言 消息队列是分布式互联网架构的重要基础设施,在以下场景都有着重要的应用: 应用解耦 削峰填谷 异步通知 分布式事务 大数据处理 ...

  8. 基于消息队列 RocketMQ 的大型分布式应用上云最佳实践

    简介:Apache RocketMQ 作为阿里巴巴开源的支撑万亿级数据洪峰的分布式消息中间件,在众多行业广泛应用.在选型过程中,开发者一定会关注开源版与商业版的业务价值对比. 那么,今天就围绕着商业版 ...

  9. 基于消息队列 RocketMQ 的大型分布式应用上云实践

    简介: Apache RocketMQ 作为阿里巴巴开源的支撑万亿级数据洪峰的分布式消息中间件,在众多行业广泛应用.在选型过程中,开发者一定会关注开源版与商业版的业务价值对比. 那么,今天就围绕着商业 ...

最新文章

  1. 将LSTM与word2vec结合实现中文自动写作
  2. 1003个微生物基因组数据发布
  3. You're AllSet! 以多重集函数角度重新检视超图GNN
  4. DS18B20 理解与操作源码
  5. 进程及 fork() 系统调用详解
  6. java 插入mysql 日期_Java日期-插入数据库
  7. OAuth 2.0 - Authorization Code授权方式详解
  8. 阿里高级技术专家张建飞:深度剖析领域模型vs数据模型的用法
  9. python post上传大文件分片上传_基于七牛 用python实现分片上传 创建文件报错701...
  10. Flutter InteractiveViewer 支持平移和缩放子Widget
  11. (03)FPGA发展前景
  12. UVa11542 - Square(gauss)
  13. 「14」支持向量机——我话说完,谁支持?谁反对?
  14. 构建自己的不可替代性
  15. Python微信爬虫_00
  16. [嵌入式学习必备网站分享]嵌入式开发必须收藏的二十个网站 内附超链接 实用 嵌入式单片机学习网站
  17. 海康大华摄像头GB/T28181接入国标视频平台如何选择主码流还是子码流
  18. react实现关于文本框的双向绑定
  19. Word分栏在排版中的应用
  20. ubuntu18.04 xamp框架搭建

热门文章

  1. h5 uniapp history模式下刷新页面404
  2. php str_replace替换特殊字符
  3. java 解析xml字符串的_java 解析xml字符串
  4. seata使用报错no available service found in cluster ‘default‘
  5. java 内存泄漏样例_一次线上Java应用内存泄漏分析实例
  6. javac参数 编译警告关闭_JVM之JIT即时编译
  7. linux下抓包工具 wireshark,网络抓包工具Wireshark的简单使用
  8. 档案电子封装包Java类,email: Android电子邮件库(基于JavaMail封装)
  9. 虚拟机上的linux作为服务器吗,linux ftp服务器 虚拟机做服务器如何实现
  10. openGauss 学习环境部署(docker方式),并使用dbeaver进行连接