libpq服务端代码存放于src/backend/libpq/pqcomm.c,顶层接口函数如下所示:

 * setup/teardown:*      StreamServerPort    - Open postmaster's server port*       StreamConnection    - Create new connection with client*        StreamClose         - Close a client/backend connection*        pq_init         - initialize libpq at backend startup*      pq_comm_reset   - reset libpq during error recovery

pq_init

pq_init在后端进程启动过程中初始化libpq。其主要是在子进程中的BackendInitialize中调用。注意这里的MyProcPort指向了postmaster传递过来的PORT结构体。

static int ServerLoop(void){| -- nSockets = initMasks(&readmask);| -- for (;;){| -- if (pmState == PM_WAIT_DEAD_END)| -- else| -- selres = select(nSockets, &rmask, NULL, NULL, &timeout); // 监听新连接| -- if (selres > 0)  | -- for (i = 0; i < MAXLISTEN; i++)  | -- port = ConnCreate(ListenSocket[i]);| -- if (port){| -- BackendStartup(Port *port)| -- pid = fork_process();| -- if (pid == 0){| -- InitPostmasterChild();| -- ClosePostmasterPorts(false);| -- BackendInitialize(Port *port)| -- MyProcPort = port;| -- port->remote_host = "";| -- port->remote_port = "";| -- pq_init();                 /* initialize libpq to talk to client */| -- whereToSendOutput = DestRemote; /* now safe to ereport to client */

在后端进程,我们以非阻塞模式操作底层套接字(MyProcPort->sock,其实就是Port传递进来的sock),并在需要时使用闩锁来实现阻塞语义。 这使我们能够提供安全的可中断读取和写入。失败时使用 COMMERROR,因为 ERROR 会尝试将错误发送给客户端,这可能需要再次更改模式,从而导致无限递归。In backends (as soon as forked) we operate the underlying socket in nonblocking mode and use latches to implement blocking semantics if needed. That allows us to provide safely interruptible reads and writes. Use COMMERROR on failure, because ERROR would try to send the error to the client, which might require changing the mode again, leading to infinite recursion. 关于闩锁可参考PG服务进程(Postgres)——WaitEventSet。

void pq_init(void) { PqSendBufferSize = PQ_SEND_BUFFER_SIZE; /* initialize state variables */PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);  // 初始化发送缓冲区PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;PqCommBusy = false;PqCommReadingMsg = false;DoingCopyOut = false;    on_proc_exit(socket_close, 0); /* set up process-exit hook to close the socket */if (!pg_set_noblock(MyProcPort->sock)) ereport(COMMERROR, (errmsg("could not set socket to nonblocking mode: %m")));FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); // 只监视socket可写事件AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
}

从上面的代码可以看出WaitEevent模块只监控socket可写事件、LATCH_SET和POSTMASTER_DEATH事件。如下为可能唤醒 WaitLatch()、WaitLatchOrSocket() 或 WaitEventSetWait() 的事件的位掩码。

#define WL_LATCH_SET      (1 << 0)
#define WL_SOCKET_READABLE   (1 << 1)  // 可读事件
#define WL_SOCKET_WRITEABLE  (1 << 2)  // 可写事件
#define WL_TIMEOUT           (1 << 3) /* not for WaitEventSetWait() */
#define WL_POSTMASTER_DEATH  (1 << 4)
#define WL_EXIT_ON_PM_DEATH  (1 << 5)
#define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE /* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_MASK      (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | WL_SOCKET_CONNECTED) // socket事件

socket_close函数在后端退出时关闭 libpq,用于注册为pg_on_exit_callback回调函数。

static void socket_close(int code, Datum arg) {  if (MyProcPort != NULL) { /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
#ifdef ENABLE_GSS...
#endif                          /* ENABLE_GSS *//* Cleanly shut down SSL layer.  Nowhere else does a postmaster child call this, so this is safe when interrupting BackendInitialize(). */secure_close(MyProcPort);/* Formerly we did an explicit close() here, but it seems better to leave the socket open until the process dies.  This allows clients to perform a "synchronous close" if they care --- wait till the transport layer reports connection closure, and you can be sure the backend has exited.* We do set sock to PGINVALID_SOCKET to prevent any further I/O, though. */MyProcPort->sock = PGINVALID_SOCKET;}
}

pq_comm_reset

pq_comm_reset函数在postgres后端进程错误处理longjmp点进行调用,用于复位libpq。

// src/include/libpq/libpq.h
typedef struct {void        (*comm_reset) (void);int            (*flush) (void);int         (*flush_if_writable) (void);bool        (*is_send_pending) (void);int           (*putmessage) (char msgtype, const char *s, size_t len);void        (*putmessage_noblock) (char msgtype, const char *s, size_t len);void        (*startcopyout) (void);void     (*endcopyout) (bool errorAbort);
} PQcommMethods;
#define pq_comm_reset() (PqCommMethods->comm_reset())

StreamConnection

StreamConnection函数主要在postmaster进程在为客户端创建PORT时使用。

static int ServerLoop(void){| -- nSockets = initMasks(&readmask);| -- for (;;){| -- if (pmState == PM_WAIT_DEAD_END)| -- else| -- selres = select(nSockets, &rmask, NULL, NULL, &timeout); // 监听新连接| -- if (selres > 0)  | -- for (i = 0; i < MAXLISTEN; i++)  | -- port = ConnCreate(ListenSocket[i]);| -- if (!(port = (Port *) calloc(1, sizeof(Port))))               | -- if (StreamConnection(serverFd, port) != STATUS_OK)

该函数使用服务器端口创建与客户端的新连接,将 port->sock 设置为新连接的 FD。假设条件是:这不需要是非阻塞的,因为 Postmaster 使用 select() 来判断服务器主套接字何时准备好接受 accept()。返回:STATUS_OK 或 STATUS_ERROR

int StreamConnection(pgsocket server_fd, Port *port) {   port->raddr.salen = sizeof(port->raddr.addr); /* accept connection and fill in the client (remote) address */if ((port->sock = accept(server_fd, (struct sockaddr *) &port->raddr.addr, &port->raddr.salen)) == PGINVALID_SOCKET){ereport(LOG,(errcode_for_socket_access(),errmsg("could not accept new connection: %m")));/* If accept() fails then postmaster.c will still see the server socket as read-ready, and will immediately try again.  To avoid uselessly sucking lots of CPU, delay a bit before trying again. (The most likely reason for failure is being out of kernel file table slots; we can do little except hope some will get freed up.) */pg_usleep(100000L);       /* wait 0.1 sec */return STATUS_ERROR;} port->laddr.salen = sizeof(port->laddr.addr); /* fill in the server (local) address */if (getsockname(port->sock,(struct sockaddr *) &port->laddr.addr,&port->laddr.salen) < 0) {elog(LOG, "getsockname() failed: %m"); return STATUS_ERROR;}if (!IS_AF_UNIX(port->laddr.addr.ss_family)){ /* select NODELAY and KEEPALIVE options if it's a TCP connection */ int         on;
#ifdef  TCP_NODELAYon = 1;if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0) {elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY"); return STATUS_ERROR;}
#endifon = 1;if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0){elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE"); return STATUS_ERROR;}/* Also apply the current keepalive parameters.  If we fail to set a* parameter, don't error out, because these aren't universally* supported.  (Note: you might think we need to reset the GUC* variables to 0 in such a case, but it's not necessary because the* show hooks for these variables report the truth anyway.) */(void) pq_setkeepalivesidle(tcp_keepalives_idle, port);(void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);(void) pq_setkeepalivescount(tcp_keepalives_count, port);(void) pq_settcpusertimeout(tcp_user_timeout, port);}return STATUS_OK;
}

pqcomm.c提供的这些例程用于处理前端和后端之间通信的低级细节。他们只是将数据推送到通信通道中,并且对数据的语义一无所知——或者将是,除了旧的 COPY OUT 协议设计中的重大脑损伤。不幸的是,COPY OUT 旨在控制通信通道(它只是传输数据而不将其包装到消息中)。 COPY OUT 正在进行时不能发送其他消息;如果副本被 ereport(ERROR) 中止,我们需要关闭副本,以便前端恢复同步。因此,这些例程必须知道 COPY OUT 状态。 (新的 COPY-OUT 是基于消息的,并且设置 DoingCopyOut 标志。)
注意:通常,直接使用 pq_putbytes() 发出传出消息是个坏主意,特别是如果消息需要多次调用才能发送。相反,使用 pqformat.c 中的例程在缓冲区中构造消息,然后在一次调用 pq_putmessage 时发出它。这确保了如果执行在消息中途被 ereport(ERROR) 中止,则通道不会被不完整的消息阻塞。唯一应该直接调用 pq_putbytes 的非 libpq 代码是老式的 COPY OUT。
曾经,libpq 在前端和后端之间共享,但现在后端的“backend/libpq”与“interfaces/libpq”完全分开。剩下的就是名字的相似性,以诱捕粗心的人……
These routines handle the low-level details of communication between frontend and backend. They just shove data across the communication channel, and are ignorant of the semantics of the data — or would be, except for major brain damage in the design of the old COPY OUT protocol. Unfortunately, COPY OUT was designed to commandeer the communication channel (it just transfers data without wrapping it into messages). No other messages can be sent while COPY OUT is in progress; and if the copy is aborted by an ereport(ERROR), we need to close out the copy so that the frontend gets back into sync. Therefore, these routines have to be aware of COPY OUT state. (New COPY-OUT is message-based and does not set the DoingCopyOut flag.)

NOTE: generally, it’s a bad idea to emit outgoing messages directly with pq_putbytes(), especially if the message would require multiple calls to send. Instead, use the routines in pqformat.c to construct the message in a buffer and then emit it in one call to pq_putmessage. This ensures that the channel will not be clogged by an incomplete message if execution is aborted by ereport(ERROR) partway through the message. The only non-libpq code that should call pq_putbytes directly is old-style COPY OUT.

At one time, libpq was shared between frontend and backend, but now the backend’s “backend/libpq” is quite separate from “interfaces/libpq”. All that remains is similarities of names to trap the unwary…

PostgreSQL数据库网络层——libpq服务端顶层接口相关推荐

  1. 魔坊APP项目-15-邀请好友(业务逻辑流程图、服务端提供邀请好友的二维码生成接口、客户端通过第三方识别微信二维码,服务端提供接口允许访问、App配置私有协议,允许第三方应用通过私有协议,唤醒APP)

    邀请好友 1.业务逻辑流程图 客户端提供点击"邀请好友"以后的页面frame,html/invite.html,代码: <!DOCTYPE html> <html ...

  2. 魔方APP项目-01-移动端开发相关概念、移动端自适配、元信息(meta)、开发准备、移动端项目搭建(模拟器调试)、APICloud(APICloud 前端框架,获取服务端API接口)

    一.移动端开发相关概念 1.APP类型 ①.Native APP Native APP又称原生APP,就是我们平时说的手机应用软件. 原生APP 是针对IOS.Android.Windows等不同的手 ...

  3. 聚播微信群控云控引擎二次开发SDK服务端对接接口

    聚播微信群控云控引擎二次开发SDK服务端对接接口 case HeartBeatReq: {// 客户端发送的心跳包heartBeatReqHandler.handleMsg(ctx, msgVo);b ...

  4. mysql数据库入门教程(1):数据库的相关概念,存储特点,软件安装教程,数据库启动,服务端登录退出

    1为什么使用数据库 使用 内存(数组,集合)存储数据,一断电容易消失数据. 使用文件存储数据,断电不消失.但存储大量数据时难以查找. 数据库:能实现数据持久化,方便查询 2数据库相关概念 DB(dat ...

  5. NPM酷库:cheerio,服务端jQuery接口实现

    NPM酷库,每天两分钟,了解一个流行NPM库. jQuery 是前端DOM操作的利器,我们通过jQuery的接口可以方便地访问.修改DOM树中的节点和内容. 有时,在Node.js服务端,我们也需要类 ...

  6. Android客户端与PHP服务端API接口Token安全验证

    Android客户端: 1.写一个生成token的算法 /*** 生成api接口的token* @param map* @param apikey* @return*/public static St ...

  7. 上传图片-服务端-Api接口定义

    API接口 模型类 系统的文件信息(图片.文档等小文件的信息)在mongodb中存储,下边是文件信息的模型类. 1) 模型如下: package com.learn.framework.domain. ...

  8. 一次服务端大面积接口响应时间骤增问题排查

    目录       一.事故背景       二.线上排查       三.问题确认       四.总结 一.事故背景 昨天下午16:05,突然接到接口报警,服务端接口响应时间陡增. 接到报警后,我们 ...

  9. endnote初始化数据库支持_服务端编程——数据库(MySQL、sequelize) - 天生笑点低你奈我何...

    一.数据库 前端发送api请求的流程 通过API发送请求,到model进行业务处理,将数据存到或在MYSQL查询,将数据一并给KOA服务器请求,最后将请求的结果返回给客户端 关系型数据库.非关系型数据 ...

最新文章

  1. logstash入门
  2. 根据xml生成相应的对象类
  3. 【matlab】设定C++编译器
  4. 中国宜坚持发展自主操作系统
  5. 【OS学习笔记】三十一 保护模式九:页目录、页表和页三者的关系详解
  6. linux脚本怎怎么屏蔽段落,怎么写shell脚本才能不耍流氓?
  7. JS - 将十六进制的颜色值转成rgb、rgba格式
  8. Pycharm如何取消自动换行
  9. Centos7中修改Hostname的方法
  10. 实现微信摇一摇部分功能
  11. 图卷积网络详细介绍(二)
  12. 简历推荐_12位AI产品经理
  13. 安装之后如何激活冰点还原软件?
  14. 角度和弧度的计算关系
  15. 51单片机引脚功能介绍
  16. gopher对mysql的利用_gopher协议的攻击利用
  17. Unity Editor 编辑器扩展 五 EditorGUI
  18. 【甘肃银行:进一步加大在区块链等领域的布局】GBCAX
  19. CSS 单词换行 word-break属性
  20. 在Centos系统下创建与Windows的共享文件夹

热门文章

  1. 零遁NAS伴侣实现WOL远程唤醒
  2. linux批量处理图片
  3. 阿里云服务器访问windows下网页(内网穿透)
  4. 什么是AWS Fargate
  5. python爬京东延迟加载_python大规模爬取京东
  6. 从磁盘到B树到B+树
  7. 电话号码的字母组合(C++实现)
  8. 开放式蓝牙耳机排行,列举几款值得推荐的开放式蓝牙耳机
  9. java学习之类方法
  10. MacOS图标文件 .icns 一键生成脚本