反应器模式

在以前的博文模式设计概述:反应器(Reactor)模式介绍过相关的概念和流程,当时使用了python但是从结果上来看并没有起到很明显的效果。最近在处理有关proxy的项目中,刚刚好涉及到有关性能的问题,故本文探索一下go的反应器模式的探索过程,当前比较知名的项目有两个一个是evio和gnet,都是反应器模式的很好的实现范例,特别是gnet在反应器模式上还加入了协程池从而比evio性能更好,本文就从头开始探索如何一步步优化改进。

go原生服务流程

package mainimport ("fmt""io""log""net"
)func worker(conn net.Conn) {defer conn.Close()b := make([]byte, 512)for {size, err := conn.Read(b)if err == io.EOF {break}if err != nil {log.Fatal(err)}fmt.Printf("Received %v bytes from %v\n", size, conn.RemoteAddr())size, err = conn.Write(b[0:size])if err != nil {log.Fatal(err)}fmt.Printf("Written %v bytes to %v\n", size, conn.RemoteAddr())}
}func main() {listener, err := net.Listen("tcp", "127.0.0.1:1234")if err != nil {log.Fatal(err)}fmt.Printf("Listering on %v\n", listener.Addr())for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}fmt.Printf("Accepted connection to %v from %v\n", conn.LocalAddr(), conn.RemoteAddr())go worker(conn)}
}

此时go run main.go运行该程序,并在新开的终端中直接使用telnet 127.0.0.1 1234。运行效果就是发送的内容与返回内容相同。此时我们先探索一下原生的net库是的流程如何。

net流程-Listen

由于环境是mac环境故源码跳转的都是对应到了posix的相关接口下。

func Listen(network, address string) (Listener, error) {var lc ListenConfigreturn lc.Listen(context.Background(), network, address)
}

listen直接返回了ListenConfig通过Listen方法创建的实例。

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil) // 解析地址if err != nil {return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}}sl := &sysListener{ListenConfig: *lc,network:      network,address:      address,}var l Listenerla := addrs.first(isIPv4)       // 检查监听的地址类型switch la := la.(type) {case *TCPAddr:l, err = sl.listenTCP(ctx, la)      // 监听的是TCPcase *UnixAddr:l, err = sl.listenUnix(ctx, la)   // 监听的是套接字default:return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}}if err != nil {return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer}return l, nil    // 返回创建的监听实例
}

此时可以看出整个流程跟平常的都是相同,先是解析监听地址,然后判断是否TCP或者Unix监听的类型。

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)   // 初始化fdif err != nil {return nil, err}return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}...func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() {raddr = raddr.toLocal(net)}family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)  // 根据不同平台设置地址并初始化
}...// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {s, err := sysSocket(family, sotype, proto)  // 初始化socketif err != nil {return nil, err}if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {  // 设置optpoll.CloseFunc(s)return nil, err}if fd, err = newFD(s, family, sotype, net); err != nil {  // 获取fdpoll.CloseFunc(s)return nil, err}// This function makes a network file descriptor for the// following applications://// - An endpoint holder that opens a passive stream//   connection, known as a stream listener//// - An endpoint holder that opens a destination-unspecific//   datagram connection, known as a datagram listener//// - An endpoint holder that opens an active stream or a//   destination-specific datagram connection, known as a//   dialer//// - An endpoint holder that opens the other connection, such//   as talking to the protocol stack inside the kernel//// For stream and datagram listeners, they will only require// named sockets, so we can assume that it's just a request// from stream or datagram listeners when laddr is not nil but// raddr is nil. Otherwise we assume it's just for dialers or// the other connection holders.if laddr != nil && raddr == nil {switch sotype {case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:     // 根据不同类型生成不同的监听类型if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {fd.Close()return nil, err}return fd, nilcase syscall.SOCK_DGRAM:if err := fd.listenDatagram(laddr, ctrlFn); err != nil {fd.Close()return nil, err}return fd, nil}}if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {  // 设置对应的地址信息fd.Close()return nil, err}return fd, nil
}

至此,如果参数等都正确就初始化完成了一个监听的fd。接下来就是接受请求。

net流程-Accept
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {fd *netFD                      // 初始化好的fdlc ListenConfig           // 监听的配置
}// SyscallConn returns a raw network connection.
// This implements the syscall.Conn interface.
//
// The returned RawConn only supports calling Control. Read and
// Write return an error.
func (l *TCPListener) SyscallConn() (syscall.RawConn, error) {if !l.ok() {return nil, syscall.EINVAL}return newRawListener(l.fd)
}// AcceptTCP accepts the next incoming call and returns the new
// connection.
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {if !l.ok() {return nil, syscall.EINVAL}c, err := l.accept()if err != nil {return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return c, nil
}// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {if !l.ok() {return nil, syscall.EINVAL}c, err := l.accept()   // 接受请求if err != nil {return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return c, nil
}// Close stops listening on the TCP address.
// Already Accepted connections are not closed.
func (l *TCPListener) Close() error {  // 关闭请求if !l.ok() {return syscall.EINVAL}if err := l.close(); err != nil {return &OpError{Op: "close", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return nil
}// Addr returns the listener's network address, a *TCPAddr.
// The Addr returned is shared by all invocations of Addr, so
// do not modify it.
func (l *TCPListener) Addr() Addr { return l.fd.laddr }  // 获取地址// SetDeadline sets the deadline associated with the listener.
// A zero time value disables the deadline.
func (l *TCPListener) SetDeadline(t time.Time) error {  // 设置超时时间if !l.ok() {return syscall.EINVAL}if err := l.fd.pfd.SetDeadline(t); err != nil {return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return nil
}// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) {   // 获取文件描述符if !l.ok() {return nil, syscall.EINVAL}f, err = l.file()if err != nil {return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return
}

通过Accept、Close相关方法查看,都是调用了进一步封装的函数并且通过层层嵌套和不同系统的封装,其中会调用到如下函数。

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {if !l.ok() {return nil, syscall.EINVAL}c, err := l.accept()if err != nil {return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return c, nil
}...func (ln *TCPListener) accept() (*TCPConn, error) {fd, err := ln.fd.accept()if err != nil {return nil, err}tc := newTCPConn(fd)if ln.lc.KeepAlive >= 0 {setKeepAlive(fd, true)ka := ln.lc.KeepAliveif ln.lc.KeepAlive == 0 {ka = defaultTCPKeepAlive}setKeepAlivePeriod(fd, ka)}return tc, nil
}...func (fd *netFD) accept() (netfd *netFD, err error) {d, rsa, errcall, err := fd.pfd.Accept()if err != nil {if errcall != "" {err = wrapSyscallError(errcall, err)}return nil, err}if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {poll.CloseFunc(d)return nil, err}if err = netfd.init(); err != nil {   // 将该fd添加到系统的Poll中,监控是否有读写的事件fd.Close()return nil, err}lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))return netfd, nil
}...func accept(s int) (int, syscall.Sockaddr, string, error) {// See ../syscall/exec_unix.go for description of ForkLock.// It is probably okay to hold the lock across syscall.Accept// because we have put fd.sysfd into non-blocking mode.// However, a call to the File method will put it back into// blocking mode. We can't take that risk, so no use of ForkLock here.ns, sa, err := AcceptFunc(s)if err == nil {syscall.CloseOnExec(ns)}if err != nil {return -1, nil, "accept", err}if err = syscall.SetNonblock(ns, true); err != nil {   // 接受完成的连接默认设置成了非阻塞CloseFunc(ns)return -1, nil, "setnonblock", err}return ns, sa, "", nil
}...// AcceptFunc is used to hook the accept call.
var AcceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept...//syscall/syscall_bsd.go
func Accept(fd int) (nfd int, sa Sockaddr, err error) {var rsa RawSockaddrAnyvar len _Socklen = SizeofSockaddrAnynfd, err = accept(fd, &rsa, &len)  // 接受连接 通过封装不同系统的syscall来接受连接if err != nil {return}if runtime.GOOS == "darwin" && len == 0 {// Accepted socket has no address.// This is likely due to a bug in xnu kernels,// where instead of ECONNABORTED error socket// is accepted, but has no address.Close(nfd)return 0, nil, ECONNABORTED}sa, err = anyToSockaddr(&rsa)if err != nil {Close(nfd)nfd = 0}return
}...func accept(s int, rsa *RawSockaddrAny, addrlen *_Socklen) (fd int, err error) {r0, _, e1 := syscall(funcPC(libc_accept_trampoline), uintptr(s), uintptr(unsafe.Pointer(rsa)), uintptr(unsafe.Pointer(addrlen)))  // 系统调用访问libc_accept_trampoline函数并传入参数fd = int(r0)if e1 != 0 {err = errnoErr(e1)}return
}

从Accept的流程可知,通过了层层的嵌套,最终分发到了syscall中的针对不同平台的accept的系统调用的封装,并且其他的函数也是如此封装调用。其中接受的请求通过netfd.init()函数默认的加入到系统的poll中,从而完成接受的事件通过go系统中的poll来进行事件通知与驱动。

性能对比-go原生与evio

go原生
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.package mainimport ("flag""fmt""io""log""net""os""strconv""strings""time"
)var res stringtype request struct {proto, method stringpath, query   stringhead, body    stringremoteAddr    string
}func worker(conn net.Conn) {defer conn.Close()data := make([]byte, 512)for {_, err := conn.Read(data)if err == io.EOF {break}if err != nil {//log.Fatal(err)return}var req requestvar out []byteleftover, err := parsereq(data, &req)if err != nil {// bad thing happenedout = appendresp(out, "500 Error", "", err.Error()+"\n")break} else if len(leftover) == len(data) {// request not ready, yetbreak}// handle the requestreq.remoteAddr = conn.RemoteAddr().String()out = appendhandle(out, &req)data = leftover_, err = conn.Write(out)if err != nil {//log.Fatal(err)}}
}func main() {var port intvar loops intvar aaaa boolvar noparse boolvar unixsocket stringvar stdlib boolflag.StringVar(&unixsocket, "unixsocket", "", "unix socket")flag.IntVar(&port, "port", 8080, "server port")flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")flag.BoolVar(&noparse, "noparse", true, "do not parse requests")flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")flag.IntVar(&loops, "loops", 0, "num loops")flag.Parse()if os.Getenv("NOPARSE") == "1" {noparse = true}if aaaa {res = strings.Repeat("a", 1024)} else {res = "Hello World!\r\n"}listener, err := net.Listen("tcp", "127.0.0.1:8080")if err != nil {log.Fatal(err)}fmt.Printf("Listering on %v\n", listener.Addr())for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}go worker(conn)}
}// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {return appendresp(b, "200 OK", "", res)
}// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {b = append(b, "HTTP/1.1"...)b = append(b, ' ')b = append(b, status...)b = append(b, '\r', '\n')b = append(b, "Server: evio\r\n"...)b = append(b, "Date: "...)b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, "Content-Length: "...)b = strconv.AppendInt(b, int64(len(body)), 10)b = append(b, '\r', '\n')}b = append(b, head...)b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, body...)}return b
}// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {sdata := string(data)var i, s intvar top stringvar clen intvar q = -1// method, path, proto linefor ; i < len(sdata); i++ {if sdata[i] == ' ' {req.method = sdata[s:i]for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '?' && q == -1 {q = i - s} else if sdata[i] == ' ' {if q != -1 {req.path = sdata[s:q]req.query = req.path[q+1 : i]} else {req.path = sdata[s:i]}for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '\n' && sdata[i-1] == '\r' {req.proto = sdata[s:i]i, s = i+1, i+1break}}break}}break}}if req.proto == "" {return data, fmt.Errorf("malformed request")}top = sdata[:s]for ; i < len(sdata); i++ {if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {line := sdata[s : i-1]s = i + 1if line == "" {req.head = sdata[len(top)+2 : i+1]i++if clen > 0 {if len(sdata[i:]) < clen {break}req.body = sdata[i : i+clen]i += clen}return data[i:], nil}if strings.HasPrefix(line, "Content-Length:") {n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)if err == nil {clen = int(n)}}}}// not enough datareturn data, nil
}

压测结果

 wrk -t8 -c200 -d60s --latency  http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:80808 threads and 200 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency     8.43ms   18.64ms 365.62ms   90.07%Req/Sec     9.13k     5.54k   40.76k    70.50%Latency Distribution50%    1.51ms75%    4.95ms90%   26.88ms99%   90.95ms4244898 requests in 1.00m, 421.02MB readSocket errors: connect 0, read 353878, write 30, timeout 0
Requests/sec:  70651.03
Transfer/sec:      7.01MB
evio代码
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.package mainimport ("bytes""flag""fmt""log""os""strconv""strings""time""github.com/tidwall/evio"
)var res stringtype request struct {proto, method stringpath, query   stringhead, body    stringremoteAddr    string
}func main() {var port intvar loops intvar aaaa boolvar noparse boolvar unixsocket stringvar stdlib boolflag.StringVar(&unixsocket, "unixsocket", "", "unix socket")flag.IntVar(&port, "port", 8080, "server port")flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")flag.BoolVar(&noparse, "noparse", true, "do not parse requests")flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")flag.IntVar(&loops, "loops", 0, "num loops")flag.Parse()if os.Getenv("NOPARSE") == "1" {noparse = true}if aaaa {res = strings.Repeat("a", 1024)} else {res = "Hello World!\r\n"}var events evio.Eventsevents.NumLoops = loopsevents.Serving = func(srv evio.Server) (action evio.Action) {log.Printf("http server started on port %d (loops: %d)", port, srv.NumLoops)if unixsocket != "" {log.Printf("http server started at %s", unixsocket)}if stdlib {log.Printf("stdlib")}return}events.Opened = func(c evio.Conn) (out []byte, opts evio.Options, action evio.Action) {c.SetContext(&evio.InputStream{})//log.Printf("opened: laddr: %v: raddr: %v", c.LocalAddr(), c.RemoteAddr())return}events.Closed = func(c evio.Conn, err error) (action evio.Action) {//log.Printf("closed: %s: %s", c.LocalAddr().String(), c.RemoteAddr().String())return}events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {if in == nil {return}is := c.Context().(*evio.InputStream)data := is.Begin(in)if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {// for testing minimal single packet request -> response.out = appendresp(nil, "200 OK", "", res)return}// process the pipelinevar req requestfor {leftover, err := parsereq(data, &req)if err != nil {// bad thing happenedout = appendresp(out, "500 Error", "", err.Error()+"\n")action = evio.Closebreak} else if len(leftover) == len(data) {// request not ready, yetbreak}// handle the requestreq.remoteAddr = c.RemoteAddr().String()out = appendhandle(out, &req)data = leftover}is.End(data)return}var ssuf stringif stdlib {ssuf = "-net"}// We at least want the single http address.addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}if unixsocket != "" {addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))}// Start serving!log.Fatal(evio.Serve(events, addrs...))
}// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {return appendresp(b, "200 OK", "", res)
}// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {b = append(b, "HTTP/1.1"...)b = append(b, ' ')b = append(b, status...)b = append(b, '\r', '\n')b = append(b, "Server: evio\r\n"...)b = append(b, "Date: "...)b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, "Content-Length: "...)b = strconv.AppendInt(b, int64(len(body)), 10)b = append(b, '\r', '\n')}b = append(b, head...)b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, body...)}return b
}// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {sdata := string(data)var i, s intvar top stringvar clen intvar q = -1// method, path, proto linefor ; i < len(sdata); i++ {if sdata[i] == ' ' {req.method = sdata[s:i]for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '?' && q == -1 {q = i - s} else if sdata[i] == ' ' {if q != -1 {req.path = sdata[s:q]req.query = req.path[q+1 : i]} else {req.path = sdata[s:i]}for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '\n' && sdata[i-1] == '\r' {req.proto = sdata[s:i]i, s = i+1, i+1break}}break}}break}}if req.proto == "" {return data, fmt.Errorf("malformed request")}top = sdata[:s]for ; i < len(sdata); i++ {if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {line := sdata[s : i-1]s = i + 1if line == "" {req.head = sdata[len(top)+2 : i+1]i++if clen > 0 {if len(sdata[i:]) < clen {break}req.body = sdata[i : i+clen]i += clen}return data[i:], nil}if strings.HasPrefix(line, "Content-Length:") {n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)if err == nil {clen = int(n)}}}}// not enough datareturn data, nil
}

启动过程中配置go run main_http_evio.go -loops=3,启动三个协程来进行处理。

通过wrk工具进行压测,测试命令wrk -t8 -c200 -d60s --latency http://127.0.0.1:8080

wrk -t8 -c200 -d60s --latency  http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:80808 threads and 200 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency     6.12ms   17.23ms 291.61ms   93.60%Req/Sec    13.83k     4.13k   46.02k    77.73%Latency Distribution50%    1.43ms75%    2.44ms90%   11.82ms99%   91.58ms6564343 requests in 1.00m, 651.07MB read
Requests/sec: 109237.85
Transfer/sec:     10.83MB
go原生 evio
请求总数 4244898 6564343
Qps 70651.03 109237.85

简单的性能对比可以看出,evio的实现模型比原生的要高大约四五十,具体来查看一下在go下面如何使用自己的事件驱动模型。

evio流程

查看的代码基于mac操作系统。

func Serve(events Events, addr ...string) error {var lns []*listenerdefer func() {for _, ln := range lns {ln.close()}}()var stdlib boolfor _, addr := range addr {  // 检查是否监听的多个地址var ln listenervar stdlibt boolln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)if stdlibt {stdlib = true}if ln.network == "unix" {os.RemoveAll(ln.addr)}var err errorif ln.network == "udp" {if ln.opts.reusePort {ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)} else {ln.pconn, err = net.ListenPacket(ln.network, ln.addr)}} else {if ln.opts.reusePort {ln.ln, err = reuseportListen(ln.network, ln.addr)} else {ln.ln, err = net.Listen(ln.network, ln.addr)  // 获取监听的连接}}if err != nil {return err}if ln.pconn != nil {ln.lnaddr = ln.pconn.LocalAddr()} else {ln.lnaddr = ln.ln.Addr()}if !stdlib {if err := ln.system(); err != nil {return err}}lns = append(lns, &ln)}if stdlib {return stdserve(events, lns)}return serve(events, lns)   // 处理连接
}...func serve(events Events, listeners []*listener) error {// figure out the correct number of loops/goroutines to use.numLoops := events.NumLoops   // 获取开启的工作协程数量if numLoops <= 0 {if numLoops == 0 {numLoops = 1} else {numLoops = runtime.NumCPU()}}s := &server{}s.events = eventss.lns = listenerss.cond = sync.NewCond(&sync.Mutex{})s.balance = events.LoadBalance  // 复杂均衡策略s.tch = make(chan time.Duration)//println("-- server starting")if s.events.Serving != nil {var svr Serversvr.NumLoops = numLoopssvr.Addrs = make([]net.Addr, len(listeners))for i, ln := range listeners {svr.Addrs[i] = ln.lnaddr}action := s.events.Serving(svr)switch action {case None:case Shutdown:return nil}}defer func() {// wait on a signal for shutdowns.waitForShutdown()    // 等待停止信号// notify all loops to close by closing all listenersfor _, l := range s.loops {l.poll.Trigger(errClosing)}// wait on all loops to complete reading eventss.wg.Wait()// close loops and all outstanding connectionsfor _, l := range s.loops {  // 关闭所有还在的连接信息for _, c := range l.fdconns {loopCloseConn(s, l, c, nil)}l.poll.Close()}//println("-- server stopped")}()// create loops locally and bind the listeners.for i := 0; i < numLoops; i++ {   // 初始化多个loop,每个loop就是工作的协程l := &loop{idx:     i,poll:    internal.OpenPoll(),packet:  make([]byte, 0xFFFF),fdconns: make(map[int]*conn),}for _, ln := range listeners {l.poll.AddRead(ln.fd)                // 监听server的id来监听新进来的连接}s.loops = append(s.loops, l)}// start loops in backgrounds.wg.Add(len(s.loops))for _, l := range s.loops {go loopRun(s, l)     // 运行}return nil
}...func loopRun(s *server, l *loop) {defer func() {//fmt.Println("-- loop stopped --", l.idx)s.signalShutdown()s.wg.Done()}()if l.idx == 0 && s.events.Tick != nil {go loopTicker(s, l)}//fmt.Println("-- loop started --", l.idx)l.poll.Wait(func(fd int, note interface{}) error {if fd == 0 {return loopNote(s, l, note)}c := l.fdconns[fd]  switch {case c == nil:return loopAccept(s, l, fd)   // 处理新进来的请求case !c.opened:return loopOpened(s, l, c)case len(c.out) > 0:return loopWrite(s, l, c)       // 处理写请求case c.action != None:return loopAction(s, l, c)default:return loopRead(s, l, c)           // 处理读请求}})
}...// Poll ...
type Poll struct {fd      intchanges []syscall.Kevent_t   // mac下面的事件驱动模型notes   noteQueue
}// OpenPoll ...
func OpenPoll() *Poll {l := new(Poll)p, err := syscall.Kqueue()    // 生成事件实例if err != nil {panic(err)}l.fd = p_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{Ident:  0,Filter: syscall.EVFILT_USER,Flags:  syscall.EV_ADD | syscall.EV_CLEAR,}}, nil, nil)    // 添加server的读请求if err != nil {panic(err)}return l
}// Close ...
func (p *Poll) Close() error {return syscall.Close(p.fd)   // 关闭fd
}// Trigger ...
func (p *Poll) Trigger(note interface{}) error {p.notes.Add(note)_, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{Ident:  0,Filter: syscall.EVFILT_USER,Fflags: syscall.NOTE_TRIGGER,}}, nil, nil)return err
}// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {events := make([]syscall.Kevent_t, 128)for {n, err := syscall.Kevent(p.fd, p.changes, events, nil)  // 循环获取当前触发的事件if err != nil && err != syscall.EINTR {return err}p.changes = p.changes[:0]            // 清空当前事件 if err := p.notes.ForEach(func(note interface{}) error {return iter(0, note) }); err != nil {return err}for i := 0; i < n; i++ {if fd := int(events[i].Ident); fd != 0 {if err := iter(fd, nil); err != nil {   // 根据回调函数依次处理触发的事件循环return err}}}}
}// AddRead ...
func (p *Poll) AddRead(fd int) {p.changes = append(p.changes,syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,},)
}
...

从evio的整个流程可知,evio完全将网络io的所有事情自行处理,完全剥离了网络事件,从而自行实现了多sub-reactor的反应器模式,这也印证了为啥evio的性能比原生高的一个特点。

总结

本文只是简单的探索了一下go中的反应器模式的现状,原生的go服务是共用了go的netpoll(后续有机会会深入学习一下),所有的事件驱动与其他的协程调度等都在一起,而evio则是自己实现了事件驱动,将所有的事件驱动全部自己管理与调度,从而更加高效的响应请求,根据evio的思路大家有兴趣也可以尝试实现一个反应器模式的事件驱动器。由于本人才疏学浅,如有错误请批评指正。

反应器(Reactor)模式-golang探索相关推荐

  1. 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )

    文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...

  2. ACE反应器(Reactor)模式

    1.ACE反应器框架简介 反应器(Reactor):用于事件多路分离和分派的体系结构模式 通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞.所谓阻塞方式的意思是指, 当试图对 ...

  3. 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )

    文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...

  4. 【gev】 Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

    gev 轻量.快速的 Golang 网络库 https://github.com/Allenxuxu/gev gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 ...

  5. golang mysql 非阻塞_Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库...

    gev 轻量.快速的 Golang 网络库 gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue ...

  6. Reactor模式:反应器模式

    Reactor这个词译成汉语还真没有什么合适的,很多地方叫反应器模式,但更多好像就直接叫reactor模式了,其实我觉着叫应答者模式更好理解一些.通过了解,这个模式更像一个侍卫,一直在等待你的召唤,或 ...

  7. Java设计模式之Reactor(反应器)模式初探

    文章来源: https://blog.csdn.net/pistolove/article/details/53152708 http://www.blogjava.net/DLevin/archiv ...

  8. 【Netty】主从反应器 ( Reactor ) 多线程模型

    文章目录 一. 主从 反应器 ( Reactor ) 多线程 模式 二. 主从 反应器 ( Reactor ) 多线程 工作流程 三. 主从 反应器 ( Reactor ) 多线程 优缺点分析 四. ...

  9. libevent之Reactor模式详解

    转自:http://blog.csdn.net/sparkliang/article/details/4957667 前面讲到,整个libevent本身就是一个Reactor,因此本节将专门对Reac ...

最新文章

  1. 微信扫码支付功能详细教程————Java
  2. oracle基础之工具系列(持续更新中,,)
  3. 元宇宙iwemeta:赵长鹏指出,监管不懂区块链,币安是区块链技术
  4. 安徽一个班37人考进清华北大,老师发来一则短信,家长沉默了
  5. Java学习之路(一):日常第一课,认识JAVA
  6. PL0编译器TurboPascal版再现时间:2009-07-20 17:24:49来源:网络 作者:未知 点击:52次
  7. mysql 与gemfire的同步_(转)分布式缓存GemFire架构介绍
  8. 【读书笔记】《Javascript语言精粹》
  9. [转] 接触C# 反射 2
  10. 吴恩达神经网络和深度学习-学习笔记-20-训练/开发/测试集划分
  11. python中oxf2是多少_Python学习笔记[2]
  12. 《微观经济学》第三章相互依存性与贸易的好处
  13. JAVA运行内存的设置
  14. 北理工python程序设计学习笔记——(三)turtle八边形绘制
  15. python抓取微博评论的图片_用Python语言爬虫抓取微博评论图文教程
  16. 使用latex导出IEEE文献格式
  17. hdu5078 hdu5074 顺便写一写鞍山
  18. apache的url重写
  19. 【杂谈】她养花10年从来没买过花盆,简单几招,废物变盆栽!
  20. 下一个行业风口:NFT 数字藏品,是机遇还是泡沫?

热门文章

  1. 5分钟学会打游戏的活体人脑细胞,比 AI 学习速度更快
  2. AI 被当做炒作工具?
  3. AI市场扩大催生多样化标注需求
  4. 针对《评人工智能如何走向新阶段》一文,继续发布国内外的跟贴留言第二部557-561条如下
  5. 豪气!华为放话:3年培养100万AI人才!网友神回应了
  6. 简单的特征值梯度剪枝,CPU和ARM上带来4-5倍的训练加速 | ECCV 2020
  7. 炸裂!这些大厂跪求的人才太牛了!
  8. 6个步骤,告诉你如何用树莓派和机器学习DIY一个车牌识别器!(附详细分析)...
  9. 华为云垃圾分类AI大赛三强出炉,ModelArts2.0让行业按下AI开发“加速键”
  10. 我在美团的这两年,想和你分享