1. reactor模型,本质上讲管理网络IO

使用下面这个结构用来管理我们的IO

struct ntyreactor {int epfd;struct ntyevent *events;
};

下面这段代码比较核心(我们关心的事件与发生的事件发生时,才去调用)

if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);
}


2. 代码实现:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>#include <fcntl.h>
#include <unistd.h>
#include <errno.h>#define BUFFER_LENGTH        4096
#define MAX_EPOLL_EVENTS    1024
#define SERVER_PORT            8888typedef int NCALLBACK(int ,int, void*);struct ntyevent {int fd;int events;void *arg;int (*callback)(int fd, int events, void *arg);int status;char buffer[BUFFER_LENGTH];int length;long last_active;
};struct ntyreactor {int epfd;struct ntyevent *events;
};int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {ev->fd = fd;ev->callback = callback;ev->events = 0;ev->arg = arg;ev->last_active = time(NULL);return ;}//nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
int nty_event_add(int epfd, int events, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};ep_ev.data.ptr = ev;ep_ev.events = ev->events = events;int op;if (ev->status == 1) {op = EPOLL_CTL_MOD;} else {op = EPOLL_CTL_ADD;ev->status = 1;}if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);return -1;}return 0;
}int nty_event_del(int epfd, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};if (ev->status != 1) {return -1;}ep_ev.data.ptr = ev;ev->status = 0;epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);return 0;
}int recv_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);nty_event_del(reactor->epfd, ev);if (len > 0) {ev->length = len;ev->buffer[len] = '\0';printf("C[%d]:%s\n", fd, ev->buffer);nty_event_set(ev, fd, send_cb, reactor);nty_event_add(reactor->epfd, EPOLLOUT, ev);} else if (len == 0) {close(ev->fd);printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return len;
}int send_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = send(fd, ev->buffer, ev->length, 0);if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);nty_event_del(reactor->epfd, ev);nty_event_set(ev, fd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, ev);} else {close(ev->fd);nty_event_del(reactor->epfd, ev);printf("send[fd=%d] error %s\n", fd, strerror(errno));}return len;
}int accept_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;if (reactor == NULL) return -1;struct sockaddr_in client_addr;socklen_t len = sizeof(client_addr);int clientfd;if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {}printf("accept: %s\n", strerror(errno));return -1;}int i = 0;do {for (i = 0;i < MAX_EPOLL_EVENTS;i ++) {if (reactor->events[i].status == 0) {break;}}if (i == MAX_EPOLL_EVENTS) {printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);break;}int flag = 0;if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);break;}nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);//设置注册函数nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);} while (0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);return 0;}int init_sock(short port) {int fd = socket(AF_INET, SOCK_STREAM, 0);fcntl(fd, F_SETFL, O_NONBLOCK);struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = htonl(INADDR_ANY);server_addr.sin_port = htons(port);bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));if (listen(fd, 20) < 0) {printf("listen failed : %s\n", strerror(errno));}return fd;
}int ntyreactor_init(struct ntyreactor *reactor) {if (reactor == NULL) return -1;memset(reactor, 0, sizeof(struct ntyreactor));reactor->epfd = epoll_create(1);if (reactor->epfd <= 0) {printf("create epfd in %s err %s\n", __func__, strerror(errno));return -2;}reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));if (reactor->events == NULL) {printf("create epfd in %s err %s\n", __func__, strerror(errno));close(reactor->epfd);return -3;}
}int ntyreactor_destory(struct ntyreactor *reactor) {close(reactor->epfd);free(reactor->events);}int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {if (reactor == NULL) return -1;if (reactor->events == NULL) return -1;nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);//给监听事件+accpternty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);//将监听事件,添加到红黑树return 0;
}int ntyreactor_run(struct ntyreactor *reactor) {if (reactor == NULL) return -1;if (reactor->epfd < 0) return -1;if (reactor->events == NULL) return -1;struct epoll_event events[MAX_EPOLL_EVENTS+1];int checkpos = 0, i;while (1) {long now = time(NULL);for (i = 0;i < 100;i ++, checkpos ++) {if (checkpos == MAX_EPOLL_EVENTS) {checkpos = 0;}if (reactor->events[checkpos].status != 1) {continue;}long duration = now - reactor->events[checkpos].last_active;if (duration >= 60) {close(reactor->events[checkpos].fd);printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);nty_event_del(reactor->epfd, &reactor->events[checkpos]);}}int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);if (nready < 0) {printf("epoll_wait error, exit\n");continue;}for (i = 0;i < nready;i ++) {struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg);}if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);}}}
}int main(int argc, char *argv[]) {unsigned short port = SERVER_PORT;if (argc == 2) {port = atoi(argv[1]);}int sockfd = init_sock(port);struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));ntyreactor_init(reactor);//初始化reactorntyreactor_addlistener(reactor, sockfd, accept_cb);//添加监听事件ntyreactor_run(reactor);ntyreactor_destory(reactor);close(sockfd);return 0;
}

reactor ---- 反应堆模型相关推荐

  1. epoll服务器反应堆模型

    常规的epoll处理 epoll是io多路复用的一种实现方式,最开始我们使用epoll是对多个fd进行管理,当epoll_wait从内核的rdllist就绪链表中取出一定数量的poll_event时, ...

  2. 【Netty】主从反应器 ( Reactor ) 多线程模型

    文章目录 一. 主从 反应器 ( Reactor ) 多线程 模式 二. 主从 反应器 ( Reactor ) 多线程 工作流程 三. 主从 反应器 ( Reactor ) 多线程 优缺点分析 四. ...

  3. reactor线程模型_简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...

  4. reactor多线程模型_Netty运用Reactor模式到极致

    常见的reactor模式有以下三种 单线程reactor 多线程reactor 主从reactor 1.单线程reactor ractor 单线程模式是指所有的I/O操作都在一个NIO线程完成,该线程 ...

  5. 浅谈Reactor 线程模型

    Reactor 的线程模型有三种:单线程模型.多线程模型.主从多线程模型.首先来看一下单线程模型,如下图所示: 所谓单线程, 即Acceptor 处理和andler 处理都在同一个线程中处理.这个模型 ...

  6. reactor多线程模型_网络编程模型的演进之路

    在没有IO多路复用的模型的情况下,为了支持高并发采取以下网络模型 一:阻塞IO+多线程 client连接服务器,服务器有一个线程阻塞的调用accept,accept接收到连接后,创建一个线程来读写读写 ...

  7. Netty Reactor线程模型与EventLoop详解

    本文来说下Netty Reactor线程模型与EventLoop 文章目录 EventLoop事件循环 任务调度 线程管理 线程分配 非阻塞传输 阻塞传输 Netty线程模型 单Reactor单线程模 ...

  8. Reactor线程模型

    一 传统阻塞线程模型 第一:服务器端有一个Acceptor线程接收客户端请求 第二:Acceptor接收到每一个客户端请求后,为每一个线程分配一个线程处理客户端请求 缺点: 第一:当数据量很大或者客户 ...

  9. 主从reactor 多线程模型

    相比多线程reactor模型,主从reactor多线程模型拥有了一个独立处理 SocketChannel 连接的线程池,当客户端从Acceptor建立连接之后,便将该连接绑定到subreactor 线 ...

  10. reactor线程模型_面试一文搞定JAVA的网络IO模型

    1,最原始的BIO模型 该模型的整体思路是有一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客 ...

最新文章

  1. Windows 7 RC Build 7100 使用报告
  2. java环境变量中classpath是必须配置吗
  3. 三种场景不建议放在关系型数据库中
  4. 开箱即用的安全方案:MaxCompute数据安全方案介绍
  5. epoll原理_Epoll源码阅读手札
  6. chrome浏览器 提示Adobe Flash Player未安装的解决方法
  7. matlab中平方根法,平方根法和改进的平方根法解线性方程组(Matlab程序)
  8. 十五届恩智浦智能车-四十天做四轮-调车日记
  9. Matlab中散点图绘制详细教程scatter函数(附matlab代码)
  10. 协议号(网络层)、端口号(传输层)详解
  11. EasyRecovery2020数据恢复软件激活码序列号秘钥下载及使用恢复教程
  12. HTTP 405 错误 – 方法不被允许 (Method not allowed)
  13. 适用于软件工程的定律Augustine's laws
  14. web安全(3)-- ClickJacking(点击劫持)
  15. 引子——漂在中关村 1
  16. XCode下Swift – WebView IOS demo
  17. bzoj5442: [Ceoi2018]Global warming
  18. 修改linux系统的时间EDT和EST为CST
  19. 12-用户及权限管理
  20. xargs的详细解释,记得收藏,相信我你会需要的

热门文章

  1. Android ViewPager + PagerAdapter 实现轮播图
  2. SqlSession介绍
  3. 神经网络模型压缩优化方法
  4. Spring常用注解用法总结
  5. Web调取摄像头拍照
  6. c# 程序调试出现“未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序。”...
  7. JavaScript实现对象的深度克隆及typeof和instanceof【简洁】【分享】
  8. 一个hitbernate配置文件,带几个表(注意mapping);如果连接字符串没有设置utf-8,向insert mysql 会产生乱码(utf8 或 utf-8)...
  9. linux 输入--输出--重定向 stdin/stdout/stderr
  10. Tomcat SSL Configuration