基于Select/Poll实现并发服务器(一)

9.3 Select/Poll概述

在LWIP中,如果要实现并发服务器,可以基于Sequentaial API来实现,这种方式需要使用多线程,也就是为每个连接创建一个线程来处理数据。而在资源受限的嵌入式设备来说,如果为每个连接都创建一个线程,这种资源的消耗是巨大的,因此,我们需要换一种实现思路,也就是使用IO多路复用的机制来实现,也就是select机制

Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。

值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSD Unix系统继承而来,poll则是从System V Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。

9.3.1 Select函数

在BSD Socket 中,select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);

【参数说明】

  • nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。
  • readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。
  • writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。
  • exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。
  • timeout 参数是一个指向 struct timeval 类型的指针,它可以使 select()在等待 timeout 时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态:

(1) 若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;
(2) 若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;
(3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。

timeval 结构体定义

struct timeval
{int tv_sec;/* 秒 */int tv_usec;/* 微妙 */
};

【返回值】

  • int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1

下列操作用来设置、清除、判断文件描述符集合。

FD_ZERO(fd_set *set);//清除一个文件描述符集。
FD_SET(int fd,fd_set *set);//将一个文件描述符加入文件描述符集中。
FD_CLR(int fd,fd_set *set);//将一个文件描述符从文件描述符集中清除。
FD_ISSET(int fd,fd_set *set);//判断文件描述符是否被置位

fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。

select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。

9.3.2 Poll函数

poll的函数原型:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

【参数说明】

  • fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。

struct pollfd原型如下:

typedef struct pollfd {int fd;                 // 需要被检测或选择的文件描述符short events;           // 对文件描述符fd上感兴趣的事件short revents;          // 文件描述符fd上当前实际发生的事件
} pollfd_t;

其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。

  • nfds:记录数组fds中描述符的总数量。
  • timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回,和select函数是类似的。

【返回值】

  • int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;

poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。

9.4 LWIP 的select/poll实现

好了,接下来看看LWIP是如何实现select/poll的。

9.4.1 lwip_select实现

目前LWIP已经完全实现select,它是基于信号量的机制来实现的,函数名是lwip_select

LWIP实现Select的基本流程如下:

1.依次检套接字集合中的每个套接字的事件表示,若有效,则记录该套接字。

2.若存在一个或多事件,则返回,否则创建一个信号量并阻塞等待,记录信号量的结构体是select_cb_list,是一个链表,在[sockets.c]文件中定义的:

static struct lwip_select_cb *select_cb_list;//管理select的链表

lwip_select_cb原型如下:

/** Description for a task waiting in select */
struct lwip_select_cb {/** Pointer to the next waiting task */struct lwip_select_cb *next;/** Pointer to the previous waiting task */struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT/** readset passed to select */fd_set *readset;/** writeset passed to select */fd_set *writeset;/** unimplemented: exceptset passed to select */fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL/** fds passed to poll; NULL if select */struct pollfd *poll_fds;/** nfds passed to poll; 0 if select */nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL *//** don't signal the same semaphore twice: set to 1 when signalled */int sem_signalled;//是否释放信号领/** semaphore to wake up a task waiting for select */SELECT_SEM_T sem;//select阻塞的信号量
};

3.当套接字集合初始化,会向netconn结构注册回调函数event_callback,当有是事件发生时,回调函数就被被执行,而且回调函数会遍历select_cb_list,如果套接字在select_cb_list中,则select_cb_list释放一个信号量。

好了,接下来看看LWIP的select具体实现,其原型如下:

int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout)
{u32_t waitres = 0;//记录select等待时间int nready;fd_set lreadset, lwriteset, lexceptset;//记录发生事件的套接字u32_t msectimeout;int i;int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREADint waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEXfd_set used_sockets;
#endifSYS_ARCH_DECL_PROTECT(lev);LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {set_errno(EINVAL);return -1;}lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);/* Go through each socket in each list to count number of sockets whichcurrently match *///检测套接字集合中是否发生事件nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);if (nready < 0) {/* one of the sockets in one of the fd_sets was invalid */set_errno(EBADF);lwip_select_dec_sockets_used(maxfdp1, &used_sockets);return -1;} else if (nready > 0) {/* one or more sockets are set, no need to wait */LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));} else {/* If we don't have any current events, then suspend if we are supposed to */if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));/* This is OK as the local fdsets are empty and nready is zero,or we would have returned earlier. */} else {/* None ready: add our semaphore to list:We don't actually need any dynamic memory. Our entry on thelist is only valid while we are in this function, so it's okto use local variables (unless we're running in MPU compatiblemode). */API_SELECT_CB_VAR_DECLARE(select_cb);API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));API_SELECT_CB_VAR_REF(select_cb).readset = readset;API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREADAPI_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {/* failed to create semaphore */set_errno(ENOMEM);lwip_select_dec_sockets_used(maxfdp1, &used_sockets);API_SELECT_CB_VAR_FREE(select_cb);return -1;}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));/* Increase select_waiting for each socket we are interested in */maxfdp2 = maxfdp1;for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {if ((readset && FD_ISSET(i, readset)) ||(writeset && FD_ISSET(i, writeset)) ||(exceptset && FD_ISSET(i, exceptset))) {struct lwip_sock *sock;SYS_ARCH_PROTECT(lev);sock = tryget_socket_unconn_locked(i);if (sock != NULL) {sock->select_waiting++;//读写异常通知,并且socket是存在的,则会将select_wainting增加1 if (sock->select_waiting == 0) {/* overflow - too many threads waiting */sock->select_waiting--;nready = -1;maxfdp2 = i;SYS_ARCH_UNPROTECT(lev);done_socket(sock);set_errno(EBUSY);break;}SYS_ARCH_UNPROTECT(lev);done_socket(sock);} else {/* Not a valid socket */nready = -1;maxfdp2 = i;SYS_ARCH_UNPROTECT(lev);set_errno(EBADF);break;}}}if (nready >= 0) {/* Call lwip_selscan again: there could have been events betweenthe last scan (without us on the list) and putting us on the list! *///执行完上述操作,再次扫描一次是否有socket有事件产生nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);if (!nready) {/* Still none ready, just wait to be woken */if (timeout == 0) {/* Wait forever */msectimeout = 0;} else {long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));if (msecs_long <= 0) {/* Wait 1ms at least (0 means wait forever) */msectimeout = 1;} else {msectimeout = (u32_t)msecs_long;}}//休眠指定时间,让出cpu控制权waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREADwaited = 1;
#endif}}/* Decrease select_waiting for each socket we are interested in */for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {if ((readset && FD_ISSET(i, readset)) ||(writeset && FD_ISSET(i, writeset)) ||(exceptset && FD_ISSET(i, exceptset))) {struct lwip_sock *sock;SYS_ARCH_PROTECT(lev);sock = tryget_socket_unconn_locked(i);if (sock != NULL) {/* for now, handle select_waiting==0... */LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);if (sock->select_waiting > 0) {sock->select_waiting--;//休眠结束, 将对应socket->select_waiting减1}SYS_ARCH_UNPROTECT(lev);done_socket(sock);} else {SYS_ARCH_UNPROTECT(lev);/* Not a valid socket */nready = -1;set_errno(EBADF);}}}lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));#if LWIP_NETCONN_SEM_PER_THREADif (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {/* don't leave the thread-local semaphore signalled */sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);}
#else /* LWIP_NETCONN_SEM_PER_THREAD */sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */API_SELECT_CB_VAR_FREE(select_cb);if (nready < 0) {/* This happens when a socket got closed while waiting */lwip_select_dec_sockets_used(maxfdp1, &used_sockets);return -1;}if (waitres == SYS_ARCH_TIMEOUT) {/* Timeout */LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));/* This is OK as the local fdsets are empty and nready is zero,or we would have returned earlier. */} else {/* See what's set now after waiting */nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));}}}lwip_select_dec_sockets_used(maxfdp1, &used_sockets);set_errno(0);if (readset) {*readset = lreadset;}if (writeset) {*writeset = lwriteset;}if (exceptset) {*exceptset = lexceptset;}return nready;
}

以上代码最核心的就是 socket->select_waiting 加1和减1的地方,当socket存在且的确需要监听事件,且并不是进来事件就已经产生或者已经超时,一定会加1;然后线程会有可能会进行休眠;正常情况下,休眠结束后,socket->select_waiting 减1,离开该函数,socket->select_waiting 恢复原值。但是,如果在休眠期间进行了close(socket) ,则通过try_socket(socket) 获取不到socket结构体,则socket->select_waiting不会进行减1,后面执行一系列语句后,退出该函数,socket->select_waiting没有恢复原值,且比进来时大1。针对该函数,socket->select_waiting加1的次数是>=减1的次数,所以如果只要在函数退出时没有恢复原值,则socket->select_waiting永远不可能再减为0了,此时socket资源就出现了假占用,该socket再也不能被其他人使用了。

lwip_select函数实现的具体流程如下:

Select的实现有个重要的结构体lwip_sock,其原型如下:

/** Contains all internal pointers and states used for a socket */
struct lwip_sock {/** sockets currently are built on netconns, each socket has one netconn */struct netconn *conn;/** data that was left from the previous read */union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL/** number of times data was received, set by event_callback(),tested by the receive and select functions */s16_t rcvevent;/** number of times data was ACKed (free send buffer), set by event_callback(),tested by select */u16_t sendevent;/** error happened for this socket, set by event_callback(), tested by select */u16_t errevent;/** counter of how many threads are waiting for this socket using select */SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX/* counter of how many threads are using a struct lwip_sock (not the 'int') */u8_t fd_used;/* status of pending close/delete actions */u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP  1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif#ifdef SAL_USING_POSIXrt_wqueue_t wait_head;
#endif
};

在socket数据接收时,lwip_sock利用netconn相关的接收函数获得一个pbuf(对于TCP)或者一个netbuf(对于UDP)数据,而这二者封装的数据可能大于socket用户指定的数据接收长度,因此在这种情况下,这两个数据包需要暂时保存在socket中,以待用户下一次读取,这里lastdata就用于指向未被用户完全读取的数据包,而lastoffset则指向了未读取的数据在数据包中的偏移。lwip_sock最后的五个字段是为select机制实现时使用的。

lwip_socket是上层Socket API中的实现,它对netconn结构的封装和增强,描述一个具体连接。它基于内核netconn来实现所有逻辑,conn指向了与socket对应的netconn结构。Netconn原型如下:

/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);/** A netconn descriptor */
struct netconn {/** type of the netconn (TCP, UDP or RAW) */enum netconn_type type;/** current state of the netconn */enum netconn_state state;/** the lwIP internal protocol control block */union {struct ip_pcb  *ip;struct tcp_pcb *tcp;struct udp_pcb *udp;struct raw_pcb *raw;} pcb;/** the last asynchronous unreported error this netconn had */err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD/** sem that is used to synchronously execute functions in the core context */sys_sem_t op_completed;
#endif/** mbox where received packets are stored until they are fetchedby the netconn application thread (can grow quite big) */sys_mbox_t recvmbox;
#if LWIP_TCP/** mbox where new connections are stored until processedby the application thread */sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX/** number of threads waiting on an mbox. This is required to unblockall threads when closing while threads are waiting. */int mbox_threads_waiting;
#endif/** only used for socket layer */
#if LWIP_SOCKETint socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO/** timeout to wait for sending data (which means enqueueing data for sendingin internal buffers) in milliseconds */s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO/** timeout in milliseconds to wait for new data to be received(or connections to arrive for listening netconns) */u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF/** maximum amount of bytes queued in recvmboxnot used for TCP: adjust TCP_WND instead! */int recv_bufsize;/** number of bytes currently in recvmbox to be received,tested against recv_bufsize to limit bytes on recvmboxfor UDP and RAW, used for FIONREAD */int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER/** values <0 mean linger is disabled, values > 0 are seconds to linger */s16_t linger;
#endif /* LWIP_SO_LINGER *//** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */u8_t flags;
#if LWIP_TCP/** TCP: when data passed to netconn_write doesn't fit into the send buffer,this temporarily stores the message.Also used during connect and close. */struct api_msg *current_msg;
#endif /* LWIP_TCP *//** A callback function that is informed about events for this netconn */netconn_callback callback;
};

前文已经提到,套接字集合初始化时,会向netconn结构注册回调函数event_callback,这个回调函数就是结构体netconnnetconn_callback,接下来看看netconn_callback函数原型:

/*** Callback registered in the netconn layer for each socket-netconn.* Processes recvevent (data available) and wakes up tasks waiting for select.** @note for LWIP_TCPIP_CORE_LOCKING any caller of this function* must have the core lock held when signaling the following events* as they might cause select_list_cb to be checked:*   NETCONN_EVT_RCVPLUS 数据被内核接收则会产生该事件*   NETCONN_EVT_SENDPLUS数据成功发送则产生该事件*   NETCONN_EVT_ERROR连接错误则产生该事件* This requirement will be asserted in select_check_waiters()*/
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{int s, check_waiters;struct lwip_sock *sock;SYS_ARCH_DECL_PROTECT(lev);LWIP_UNUSED_ARG(len);/* Get socket */if (conn) {s = conn->socket;if (s < 0) {/* Data comes in right away after an accept, even though* the server task might not have created a new socket yet.* Just count down (or up) if that's the case and we* will use the data later. Note that only receive events* can happen before the new socket is set up. */SYS_ARCH_PROTECT(lev);if (conn->socket < 0) {if (evt == NETCONN_EVT_RCVPLUS) {/* conn->socket is -1 on initializationlwip_accept adjusts sock->recvevent if conn->socket < -1 */conn->socket--;}SYS_ARCH_UNPROTECT(lev);return;}s = conn->socket;SYS_ARCH_UNPROTECT(lev);}sock = get_socket(s);//获取socket对应的结构if (!sock) {return;}} else {return;}check_waiters = 1;//进入临界区,根据事件来更新socket的event值SYS_ARCH_PROTECT(lev);/* Set event as required */switch (evt) {case NETCONN_EVT_RCVPLUS://数据被内核收到sock->rcvevent++;if (sock->rcvevent > 1) {check_waiters = 0;}break;case NETCONN_EVT_RCVMINUS://数据被用户读取sock->rcvevent--;check_waiters = 0;break;case NETCONN_EVT_SENDPLUS://输出发送成功if (sock->sendevent) {check_waiters = 0;}sock->sendevent = 1;break;case NETCONN_EVT_SENDMINUS://用户写入数据到缓冲区sock->sendevent = 0;check_waiters = 0;break;case NETCONN_EVT_ERROR://连接错误sock->errevent = 1;break;default:LWIP_ASSERT("unknown event", 0);break;}//事件设置完毕,唤醒阻塞的select函数if (sock->select_waiting && check_waiters) {/* Save which events are active */int has_recvevent, has_sendevent, has_errevent;has_recvevent = sock->rcvevent > 0;//数据可读事件has_sendevent = sock->sendevent != 0;//数据可写事件has_errevent = sock->errevent != 0;//数据异常事件SYS_ARCH_UNPROTECT(lev);/* Check any select calls waiting on this socket */select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);} else {SYS_ARCH_UNPROTECT(lev);}done_socket(sock);
}

综上,event_callback 的本质就是readsetwritesetexceptset集合的监听,并对rcveventsendeventerrevent的填写,并阻塞的lwip_select函数发送信号量。而lwip_select的本质就是对rcveventsendeventerrevent的读取,并执行相应的操作,lwip_select主要是通过lwip_selscan来扫描事件的。

9.4.2 lwip_poll实现

LWIP也完全实现poll,函数名是lwip_polllwip_polllwip_select的实现机制差不多,只是lwip_poll使用pollfd的结构来存储描述符的,它是基于链表来存储的,这样lwip_poll函数没有最大文件描述符数量的限制。lwip_poll函数原型如下:

int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{u32_t waitres = 0;int nready;u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREADint waited = 0;
#endifLWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",(void*)fds, (int)nfds, timeout));LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),set_errno(EINVAL); return -1;);lwip_poll_inc_sockets_used(fds, nfds);/* Go through each struct pollfd to count number of structureswhich currently match */nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);if (nready < 0) {lwip_poll_dec_sockets_used(fds, nfds);return -1;}/* If we don't have any current events, then suspend if we are supposed to */if (!nready) {API_SELECT_CB_VAR_DECLARE(select_cb);if (timeout == 0) {LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));goto return_success;}API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));/* None ready: add our semaphore to list:We don't actually need any dynamic memory. Our entry on thelist is only valid while we are in this function, so it's okto use local variables. */API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREADAPI_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {/* failed to create semaphore */set_errno(EAGAIN);lwip_poll_dec_sockets_used(fds, nfds);API_SELECT_CB_VAR_FREE(select_cb);return -1;}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));/* Increase select_waiting for each socket we are interested in.Also, check for events again: there could have been events betweenthe last scan (without us on the list) and putting us on the list! */nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);if (!nready) {/* Still none ready, just wait to be woken */if (timeout < 0) {/* Wait forever */msectimeout = 0;} else {/* timeout == 0 would have been handled earlier. */LWIP_ASSERT("timeout > 0", timeout > 0);msectimeout = timeout;}waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREADwaited = 1;
#endif}/* Decrease select_waiting for each socket we are interested in,and check which events occurred while we waited. */nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));#if LWIP_NETCONN_SEM_PER_THREADif (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {/* don't leave the thread-local semaphore signalled */sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);}
#else /* LWIP_NETCONN_SEM_PER_THREAD */sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */API_SELECT_CB_VAR_FREE(select_cb);if (nready < 0) {/* This happens when a socket got closed while waiting */lwip_poll_dec_sockets_used(fds, nfds);return -1;}if (waitres == SYS_ARCH_TIMEOUT) {/* Timeout */LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));goto return_success;}}LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:lwip_poll_dec_sockets_used(fds, nfds);set_errno(0);return nready;
}

lwip_select一样也是对事件进行扫描,只是扫描函数是lwip_pollscan而已。后面的内容就不在分析,有兴趣请参看LWIP源码。

lwip_poll函数实现的具体流程如下:

9.5并发服务器实现

前文讲解了select/poll机制在LWIP的实现,接下来将使用select/poll来实现并发服务器。这里以select为例。

select并发服务器模型

socket(...); // 创建套接字
bind(...);   // 绑定
listen(...); // 监听while(1)
{if(select(...) > 0) // 检测监听套接字是否可读{if(FD_ISSET(...)>0) // 套接字可读,证明有新客户端连接服务器  {accpet(...);// 取出已经完成的连接process(...);// 处理请求,反馈结果}}close(...); // 关闭连接套接字:accept()返回的套接字
}

因此,基于select实现的并发服务器模型如下:

从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

Server:

/********************************************************************************* @file                server.c* @author              BruceOu* @rtt version         V4.0.3* @version             V1.0* @date                2021-07-08* @blog                https://blog.bruceou.cn/* @Official Accounts   嵌入式实验楼* @brief               基于select的服务器*******************************************************************************/
#include <rtthread.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdev.h>
#include <stdio.h>
#include <string.h>#define SERVER_PORT   8888
#define BUFF_SIZE 1024static char recvbuff[BUFF_SIZE];static void net_server_thread_entry(void *parameter)
{int sfd, cfd, maxfd, i, nready, n;struct sockaddr_in server_addr, client_addr;struct netdev *netdev = RT_NULL;char sendbuff[] = "Hello client!";socklen_t client_addr_len;fd_set all_set, read_set;//FD_SETSIZE里面包含了服务器的fdint clientfds[FD_SETSIZE - 1];/* 通过名称获取 netdev 网卡对象 */netdev = netdev_get_by_name((char*)parameter);if (netdev == RT_NULL){rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);}//创建socketif ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){rt_kprintf("Socket create failed.\n");}server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);//server_addr.sin_addr.s_addr = htonl(INADDR_ANY);/* 获取网卡对象中 IP 地址信息 */server_addr.sin_addr.s_addr = netdev->ip_addr.addr;//绑定socketif (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0){rt_kprintf("socket bind failed.\n");closesocket(sfd);}rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);//监听socketif(listen(sfd, 5) == -1){rt_kprintf("listen error");}else{rt_kprintf("listening...\n");}client_addr_len = sizeof(client_addr);//初始化 maxfd 等于 sfdmaxfd = sfd;//清空fdsetFD_ZERO(&all_set);//把sfd文件描述符添加到集合中FD_SET(sfd, &all_set);//初始化客户端fd的集合for(i = 0; i < FD_SETSIZE -1 ; i++){//初始化为-1clientfds[i] = -1;}while(1){//每次select返回之后,fd_set集合就会变化,再select时,就不能使用,//所以我们要保存设置fd_set 和 读取的fd_setread_set = all_set;nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);//没有超时机制,不会返回0if(nready < 0){rt_kprintf("select error \r\n");}//判断监听的套接字是否有数据if(FD_ISSET(sfd, &read_set)){//有客户端进行连接了cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);if(cfd < 0){rt_kprintf("accept socket error\r\n");//继续selectcontinue;}rt_kprintf("new client connect fd = %d\r\n", cfd);//把新的cfd 添加到fd_set集合中FD_SET(cfd, &all_set);//更新要select的maxfdmaxfd = (cfd > maxfd)?cfd:maxfd;//把新的cfd 保存到cfds集合中for(i = 0; i < FD_SETSIZE -1 ; i++){if(clientfds[i] == -1){clientfds[i] = cfd;//退出,不需要添加break;}}//没有其他套接字需要处理:这里防止重复工作,就不去执行其他任务if(--nready == 0){//继续selectcontinue;}}//遍历所有的客户端文件描述符for(i = 0; i < FD_SETSIZE -1 ; i++){if(clientfds[i] == -1){//继续遍历continue;}//判断是否在fd_set集合里面if(FD_ISSET(clientfds[i], &read_set)){n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);rt_kprintf("clientfd %d:  %s \r\n",clientfds[i], recvbuff);if(n <= 0){//从集合里面清除FD_CLR(clientfds[i], &all_set);//当前的客户端fd 赋值为-1clientfds[i] = -1;                }else{//写回客户端n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);if(n < 0){//从集合里面清除FD_CLR(clientfds[i], &all_set);//当前的客户端fd 赋值为-1clientfds[i] = -1;}}}}}
}static int server(int argc, char **argv)
{rt_err_t ret = RT_EOK;if (argc != 2){rt_kprintf("bind_test [netdev_name]  --bind network interface device by name.\n");return -RT_ERROR;}/* 创建 serial 线程 */rt_thread_t thread = rt_thread_create("server",net_server_thread_entry,argv[1],4096,10,10);/* 创建成功则启动线程 */if (thread != RT_NULL){rt_thread_startup(thread);}else{ret = RT_ERROR;}return ret;
}#ifdef FINSH_USING_MSH
#include <finsh.h>
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */

Client:【Linux版】

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <netinet/in.h>#define SERVPORT 8888int main(int argc,char *argv[])
{char sendbuf[] = "Client1 : Hello Rtthread!";char recvbuf[2014];int sockfd,sendbytes;struct sockaddr_in serv_addr;//需要连接的服务器地址信息if (argc != 2){perror("init error");}//1.创建socket//AF_INET 表示IPV4//SOCK_STREAM 表示TCPif((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) {perror("socket");exit(1);}//填充服务器地址信息serv_addr.sin_family    = AF_INET; //网络层的IP协议: IPV4serv_addr.sin_port      = htons(SERVPORT); //传输层的端口号serv_addr.sin_addr.s_addr   = inet_addr(argv[1]); //网络层的IP地址: 实际的服务器IP地址bzero(&(serv_addr.sin_zero),8); //保留的8字节置零//2.发起对服务器的连接信息//三次握手,需要将sockaddr_in类型的数据结构强制转换为sockaddrif((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {perror("connect failed!");exit(1);}printf("connect successful! \n");//3.发送消息给服务器端while (1){send(sockfd, sendbuf, strlen(sendbuf), 0);recv(sockfd, recvbuf, sizeof(recvbuf), 0);printf("Server : %s \n", recvbuf);sleep(2);}//4.关闭close(sockfd);}

Client:【RT-Thread版】

/********************************************************************************* @file                client.c* @author              BruceOu* @rtt version         V4.0.3* @version             V1.0* @date                2021-08-01* @blog                https://blog.bruceou.cn/* @Official Accounts   嵌入式实验楼* @brief               客户端*******************************************************************************/
#include <rtthread.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdev.h>
#include <stdio.h>
#include <string.h>#define SERVER_HOST   "192.168.101.8"
#define SERVER_PORT   8888static int client(int argc, char **argv)
{struct sockaddr_in client_addr;struct sockaddr_in server_addr;struct netdev *netdev = RT_NULL;int sockfd = -1;char sendbuf[] = "Hello RT-Thread! \r\n";char recvbuf[2014];if (argc != 2){rt_kprintf("bind_test [netdev_name]  --bind network interface device by name.\n");return -RT_ERROR;}/* 通过名称获取 netdev 网卡对象 */netdev = netdev_get_by_name(argv[1]);if (netdev == RT_NULL){rt_kprintf("get network interface device(%s) failed.\n", argv[1]);return -RT_ERROR;}if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){rt_kprintf("Socket create failed.\n");return -RT_ERROR;}/* 初始化需要绑定的客户端地址 */client_addr.sin_family = AF_INET;client_addr.sin_port = htons(8080);/* 获取网卡对象中 IP 地址信息 */client_addr.sin_addr.s_addr = netdev->ip_addr.addr;rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0){rt_kprintf("socket bind failed.\n");closesocket(sockfd);return -RT_ERROR;}rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);/* 初始化预连接的服务端地址 */server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));/* 连接到服务端 */if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0){rt_kprintf("socket connect failed!\n");closesocket(sockfd);return -RT_ERROR;}else{rt_kprintf("socket connect success!\n");}while (1){send(sockfd, sendbuf, strlen(sendbuf), 0);recv(sockfd, recvbuf, sizeof(recvbuf), 0);fputs(recvbuf, stdout);memset(recvbuf, 0, sizeof(recvbuf));rt_thread_mdelay(500);}/* 关闭连接 */closesocket(sockfd);return RT_EOK;
}#ifdef FINSH_USING_MSH
#include <finsh.h>
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */

接下来就是验证了,关于ART-Pi的联网部分就不再赘述了有不懂的看前面的章节。

现在ART-Pi上开启服务器:
Server:

然后开启客户端,笔者的客户端在Ubuntu上运行的:
Client:

笔者这里使用的客户端只有两个,有兴趣的也可以使用多个客户端。
当然啦,如果懒得写客户端,也可使用网络调试助手测试。

9.6总结

select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。使用select监听socket时是线性扫描的方式,即采用轮询的方式,当套接字比较多的时候,每次select都要遍历每个socket来完成调度,不管哪个socket是活跃的,都会遍历一遍,这很浪费CPU的时间,如果每个套接字都能注册一个回调函数,当套接字活跃时直接调用回调也是很方便的,这样就避免了轮训。

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

总之,select/poll能监听多个设备的文件描述符,只要有任何一个设备满足条件,select/poll就会返回,否则将进行睡眠等待



代码获取方法

1.长按下面二维码,关注公众号[嵌入式实验楼]
2.在公众号回复关键词[Art-Pi]获取资料



欢迎访问我的网站

BruceOu的哔哩哔哩
BruceOu的主页
BruceOu的博客
BruceOu的CSDN博客
BruceOu的简书

《嵌入式系统 – 玩转ART-Pi开发板(基于RT-Thread系统)》第9章 基于Select/Poll实现并发服务器(二)相关推荐

  1. fpga驱动rgb液晶屏_正点原子开拓者FPGA开发板资料连载第五十四章基于的数字识别实验...

    1)实验平台:正点原子开拓者FPGA 开发板 2)摘自<开拓者FPGA开发指南>关注官方微信号公众号,获取更多资料:正点原子 3)全套实验源码+手册+视频下载地址:http://www.o ...

  2. 赛灵思运行linux,玩转赛灵思Zedboard开发板(6):如何在Zedboard上运行linux下的应用程序?...

    描述 电子发烧友网讯:ZedBoard开发板上的Zynq是一个ARM PS(processing system, 双核A9 + 存储管理 + 外设)+ PL(programable Logic) 结构 ...

  3. STM32F103RCT6Mini开发板搭建指纹识别系统

    STM32F103RCT6Mini开发板搭建指纹识别系统 导读 模块和工具的准备 接线 1.44寸液晶屏电源接线 液晶屏数据线接线 液晶屏控制线接线 指纹识别模块AS608接线 实物图: 接线图: 移 ...

  4. 2021爱智先行者—记录一次 Spirit 1 和 IoT Pi 开发板的实战经历

    目录 前言 正文 一.IoT Pi 开发板介绍 二.开发实战 1. 连接设备 2. 搭建开发环境 3. 开始编码 4. 编译打包 5. 安装应用程序 6. LED灯开关控制演示 7. 程序升级改造 8 ...

  5. 玩转你的开发板-1.4.第1季第4部分-朱有鹏-专题视频课程

    玩转你的开发板-1.4.第1季第4部分-1586人已学习 课程介绍         本课程是<朱有鹏老师单片机完全学习系列课程>第1季第4个课程,主要内容是带领大家玩转课程配套开发板,包括 ...

  6. TurnipBit:可以带着孩子一起玩编程的MicroPython开发板!

    2019独角兽企业重金招聘Python工程师标准>>> 从小就编程"有可能不只是名人传记里才能看到的故事,现在"全民编程"已成一股热潮,那么让孩子接触编 ...

  7. ARM开发板如何安装Linux系统

    转自:http://www.eepw.com.cn/article/201611/322612_2.htm 注意:本小节假定您已经连接好开发板的和PC机之间的串口和USB口,并把开发板设置为NORFl ...

  8. 【北京迅为】《iTOP-3568开发板快速测试手册》-第2章 Android11系统功能测试

    瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器.RK3568 支持4K 解码和 1080P 编 ...

  9. 【北京迅为】《iTOP-3568开发板快速测试手册》第4章 Buildroot系统功能测试(2)

    瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器.RK3568 支持4K 解码和 1080P 编 ...

最新文章

  1. pip install 及导出安装库,批量安装库
  2. 架构漫谈读后感之软件架构师如何工作
  3. Event Tracing for Windows
  4. 基于selenium实现12306的登录操作(图形验证码识别)
  5. 树莓派3_win10下使用远程桌面连接与树莓派通信(使用VNC实现连接后)
  6. 8步教你打开Android之门 NDK入门教程
  7. 游戏开发中的数据表示
  8. linux安装thrift
  9. 补CEGUIFont_xmlHandler.cpp
  10. 远程打开其他电脑的computer management
  11. oracle用户导出和导入用不同的用户名,oracle用exp\imp导出导入,及创建表空间与用户...
  12. git cherry-pick 多个commit_Git使用爬坑记录
  13. Vagrant安装CentOS7镜像
  14. 移动web开发rem+js适配布局开发
  15. 云服务器只能显示控制台吗,云服务器控制台使用方法
  16. vue watch首次不触发的解决方案
  17. 分支-07. 比较大小(10)
  18. iPhone iOS升级完美指南
  19. < pre >标签 定义预格式化的文本
  20. debian无法使用ifconfig

热门文章

  1. 代码人生之《掌控习惯》分享
  2. Handshake - DNS域名系统的搅局者
  3. 好家伙!清华电子系大一暑假Python课程大作业上知乎热榜!竟是个CV任务
  4. 心理咨询服务微信小程序的设计与实现-计算机毕业设计
  5. V 神“继任者”排行榜第三,她一年在 Github 上为以太坊做出 1781 个贡献
  6. This is My frist Webo Happy!!!
  7. 淘宝网-接口测试白皮书V0.1
  8. 转ios icon及默认图片等
  9. oracle时间戳表达式,Oracle Timestamp类型
  10. JS中常见的 “Uncaught TypeError: XXXX is not a function” 错误解析