frp源码剖析-frp中的mux模块
前言
frp几乎所有的连接处理都是构建在mux模块之上的,重要性不必多说,来看一下这是个啥吧
ps: 安装方法
go get "github.com/fatedier/golib/net/mux"
该模块很小,不到300行,分为两个文件:mux.go
和rule.go
。
因为rule.go
文件相对简单一些,我们先来看这个。
role.go文件
首先看其中所命名的函数类型MatchFunc
:
type MatchFunc func(data []byte) (match bool)
该类型的函数用来判断data
属于什么协议。
那么具体如何判断呢,这里也实现了三个例子:
var (HttpsNeedBytesNum uint32 = 1HttpNeedBytesNum uint32 = 3YamuxNeedBytesNum uint32 = 2
)var HttpsMatchFunc MatchFunc = func(data []byte) bool {if len(data) < int(HttpsNeedBytesNum) {return false}if data[0] == 0x16 {return true} else {return false}
}// From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
var httpHeadBytes = map[string]struct{}{"GET": struct{}{},"HEA": struct{}{},"POS": struct{}{},"PUT": struct{}{},"DEL": struct{}{},"CON": struct{}{},"OPT": struct{}{},"TRA": struct{}{},"PAT": struct{}{},
}var HttpMatchFunc MatchFunc = func(data []byte) bool {if len(data) < int(HttpNeedBytesNum) {return false}_, ok := httpHeadBytes[string(data[:3])]return ok
}// From https://github.com/hashicorp/yamux/blob/master/spec.md
var YamuxMatchFunc MatchFunc = func(data []byte) bool {if len(data) < int(YamuxNeedBytesNum) {return false}if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 {return true}return false
}
这三个函数分别实现了区分HTTPS
,HTTP
以及go中特有的yamux
(实际上这是一个库,可以参考Go中的I/O多路复用)。
mux.go文件
先来看其中的struct
,第一个是Mux
第二个是listener
,这里先来看一下较为简单的listener
。
listener结构体
type listener struct {mux *Muxpriority intneedBytesNum uint32matchFn MatchFuncc chan net.Connmu sync.RWMutex
}// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (net.Conn, error) {...
}// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {...
}func (ln *listener) Addr() net.Addr {...
}
刚看到这个结构体我们可能很迷惑,不知道都是干啥的,而且网络编程中一般listener这种东西要绑定在一个套接字上,但很明显listener
没有,不过其唯一跟套接字相关的可能是其c
字段,其是一个由net
包中的Conn
接口组成的chanel
;然后mu
字段就是读写锁了,这个很简单;然后mux
字段则是上面提到的两个结构体中的另一个结构体Mux
的指针;接下来到了priority
字段上,顾名思义,这个似乎跟优先级有关系,暂且存疑;needBytesNum
则更有些蒙了,不过感觉其是跟读取byte的数量有关系,最后是matchFn
。
好,初步认识了这个结构体的结构后,我们看看其方法。三个方法的listener
实现了net
模块中的Listener
接口:
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {// Accept waits for and returns the next connection to the listener.Accept() (Conn, error)// Close closes the listener.// Any blocked Accept operations will be unblocked and return errors.Close() error// Addr returns the listener's network address.Addr() Addr
}
然后先来分析其Accept
方法:
func (ln *listener) Accept() (net.Conn, error) {conn, ok := <-ln.cif !ok {return nil, fmt.Errorf("network connection closed")}return conn, nil
}
该方法很简单,就是从c
这个由Conn
组成的channel
中,获取Conn
对象,好这里我们就明白了,这个listener
和普通的不一样,他很特别,普通的listener
监听的是套接字,而他监听的是channel
,另外,肯定有某个地方在不停的往c
这个channel
中放Conn
。
接下来是Close
方法:
func (ln *listener) Close() error {if ok := ln.mux.release(ln); ok {// Close done to signal to any RLock holders to release their lock.close(ln.c)}return nil
}
我们暂且先把这个ln.mux.release(ln)
放到一边,因为还不知道这个东西干了啥,暂且只需关注close(ln.c)
,我们知道这个函数是用来关闭channel
的,go推荐由发送端调用,但这里似乎listener
是一个消费端,可以看一下如何优雅的关闭Go Channel,看来重点在于ln.mux.release(ln)
这里,我们暂且存疑[1],留待下面解决。
最后是Addr
方法:
func (ln *listener) Addr() net.Addr {if ln.mux == nil {return nil}ln.mux.mu.RLock()defer ln.mux.mu.RUnlock()if ln.mux.ln == nil {return nil}return ln.mux.ln.Addr()
}
在这里,mu
字段就用上了,加读锁,然后返回mux
字段中的ln
字段的Addr
方法。也就是这句return ln.mux.ln.Addr()
。
Mux结构体
字段以及相关函数
Mux结构体则相对来说复杂很多,先来看一下他的字段定义:
type Mux struct {ln net.ListenerdefaultLn *listener// sorted by prioritylns []*listenermaxNeedBytesNum uint32mu sync.RWMutex
}
好,第一个字段ln
是一个Listener
接口;然后defaultLn
是一个listener
的指针;lns
则是由listener
的指针组成的切片,根据注释// sorted by priority
,我们终于知道listener
的priority
字段是干啥的了;接下来是maxNeedBytesNum
字段,好奇怪,比起listener
的needBytesNum
多了个“Max”,所以我们推测这个值取得是lns
以及defaultLn
字段中所有listener
中needBytesNum
值最大的;最后的mu
字段我们就不说了。
需要注意的是:我们可能会发现Mux
和listener
存在相互引用,但在Go
中我们倒也不用太担心,因为Go
采用“标记-回收”或者其变种的垃圾回收算法,感兴趣可以参考Golang 垃圾回收剖析
在mux.go
文件中定义了Mux
的生成函数NewMux
:
func NewMux(ln net.Listener) (mux *Mux) {mux = &Mux{ln: ln,lns: make([]*listener, 0),}return
}
很简单,需要注意的是ln
字段存储的一般不是listener
这样的非常规Listener,一般是TCPListener
这样具体的绑定了套接字的监听器。
Mux方法
接下来看Mux
结构体的方法,首先看Listen
和copyLns
// priority
func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {// 1ln := &listener{c: make(chan net.Conn),mux: mux,priority: priority,needBytesNum: needBytesNum,matchFn: fn,}mux.mu.Lock()defer mux.mu.Unlock()// 2if needBytesNum > mux.maxNeedBytesNum {mux.maxNeedBytesNum = needBytesNum}// 3newlns := append(mux.copyLns(), ln)sort.Slice(newlns, func(i, j int) bool {if newlns[i].priority == newlns[j].priority {return newlns[i].needBytesNum < newlns[j].needBytesNum}return newlns[i].priority < newlns[j].priority})mux.lns = newlnsreturn ln
}func (mux *Mux) copyLns() []*listener {lns := make([]*listener, 0, len(mux.lns))for _, l := range mux.lns {lns = append(lns, l)}return lns
}
copyLns
方法很简单,就是跟名字的含义一样,生成一个lns
字段的副本并返回。
Listen
基本做了三步:
- 生成一个
listener
结构体实例,并获取互斥锁 - 根据情况更新
needBytesNum
字段 - 将新生成的
listener
实例按照优先级放入lns
字段对应的slice中
接下来是ListenHttp
和ListenHttps
方法:
func (mux *Mux) ListenHttp(priority int) net.Listener {return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
}func (mux *Mux) ListenHttps(priority int) net.Listener {return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
}
这两个差不多,所以放到一起说,基本都是专门写了一个方法让我们能方便的创建处理Http
或者Https
的listener
。
再来看DefaultListener
方法:
func (mux *Mux) DefaultListener() net.Listener {mux.mu.Lock()defer mux.mu.Unlock()if mux.defaultLn == nil {mux.defaultLn = &listener{c: make(chan net.Conn),mux: mux,}}return mux.defaultLn
}
这个方法很简单,基本就是有则返回没有则生成然后返回的套路。不过我们要注意defaultLn
字段中的listener
是不放入lns
字段中的。
接下来是Server
方法:
// Serve handles connections from ln and multiplexes then across registered listeners.
func (mux *Mux) Serve() error {for {// Wait for the next connection.// If it returns a temporary error then simply retry.// If it returns any other error then exit immediately.conn, err := mux.ln.Accept()if err, ok := err.(interface {Temporary() bool}); ok && err.Temporary() {continue}if err != nil {return err}go mux.handleConn(conn)}
}
一般来说,当我们调用NewMux
函数以后,接下来就会调用Server
方法,该方法基本上就是阻塞监听某个套接字,当有连接建立成功后立即另起一个goroutine调用handleConn
方法;当连接建立失败根据err
是否含有Temporary
方法,如果有则执行并忽略错误,没有则返回错误。
现在我们看看handleConn
方法干了些啥:
func (mux *Mux) handleConn(conn net.Conn) {// 1mux.mu.RLock()maxNeedBytesNum := mux.maxNeedBytesNumlns := mux.lnsdefaultLn := mux.defaultLnmux.mu.RUnlock()// 2sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum))data := make([]byte, maxNeedBytesNum)conn.SetReadDeadline(time.Now().Add(DefaultTimeout))_, err := io.ReadFull(rd, data)if err != nil {conn.Close()return}conn.SetReadDeadline(time.Time{})// 3for _, ln := range lns {if match := ln.matchFn(data); match {err = errors.PanicToError(func() {ln.c <- sharedConn})if err != nil {conn.Close()}return}}// No match listenersif defaultLn != nil {err = errors.PanicToError(func() {defaultLn.c <- sharedConn})if err != nil {conn.Close()}return}// No listeners for this connection, close it.conn.Close()return
}
handleConn
方法也不算复杂,大体可以分为三步:
- 获取当前状态
- 从
conn
中读取数据,注意:shareConn
和rd
存在单向关系,如果从rd
中读取数据的话,数据也会复制一份放到shareConn
中,反过来就不成立了 - 读取到的数据会被遍历,最终选出
与matchFunc
匹配的最高优先级的listener
,并将shareConn
放入该listener
的c
字段中,如果没有匹配到则放到defaultLn
中的c
字段中,如果defaultLn
是nil
的话就不处理,直接关闭conn
。
最后来到了release
方法了:
func (mux *Mux) release(ln *listener) bool {result := falsemux.mu.Lock()defer mux.mu.Unlock()lns := mux.copyLns()for i, l := range lns {if l == ln {lns = append(lns[:i], lns[i+1:]...)result = truebreak}}mux.lns = lnsreturn result
}
release方法意思很明确:把对应的listener
从lns
中移除,并把结果返回,整个过程有互斥锁,我们回到存疑1,尽管有互斥锁,但在这种情况下:当某个goroutine运行到handleConn
已经执行到了第三阶段的开始状态(也就是还没有找到匹配的listener
)时,且Go
运行在多核状态下,当另一个goroutine运行完listener
的Close
方法时,这时就可能发生往一个已经关闭的channel
中send数据,但请注意handleConn
的第三步的这段代码:
err = errors.PanicToError(func() { // 就是这里了ln.c <- sharedConn
})
if err != nil {conn.Close()
}
这个PanicToError
是这样的:
func PanicToError(fn func()) (err error) {defer func() {if r := recover(); r != nil {err = fmt.Errorf("Panic error: %v", r)}}()fn()return
}
基本上就是执行了recover
然后将错误打印出来,结合下面的对err的判断,就会将send失败的conn关闭。
总结
Mux
中包含了一个初始监听器,基本上所有的事件(比如说新的连接建立,之所以叫事件是因为我实在想不出更精确的词语了)都起源于此listener
实现了net.Listener
接口,可以作为二级监听器使用(比如传给net/http.Server
结构体的Server
方法进行处理)。Mux
包含了一个由listener
组成的有序slice,当有事件产生时就会遍历这个slice找出合适的listener
并将事件传给他。
讲到这里基本上是完事了。整个mux
模块还是比较简单的,起码是由一个个简单的东西组合而成。那么一起来意淫一下整体流程吧。
假如我要实现这么一个网络程序:
- 绑定监听一个基于tcp的套接字
- 我们允许其应用层可支持多个(比如说支持http https这两个吧,尽管http和https可以说是一个协议。。),不同的应用层协议对应不同的处理函数
就这么两个很简单的要求,不难吧。
那么我们一起来实现吧:
type HandleFunc func(c net.Conn) (n int, err error) type MyServer struct {l net.ListenerhFunc HandleFunc
}func (h *MyServer) Server() (err error) {for {conn, err := h.l.Accept()if err != nil {return}go h.hFunc(conn)}
}func HandleHttp(c net.Conn)(n int, err error){n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?"))
}func HandleHttps(c net.Conn)(n int, err error){n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?"))
}func main() (err error){ln, err := net.Listen("tcp", "0.0.0.0:12345")if err != nil {err = fmt.Errorf("Create server listener error, %v", err)return}muxer = mux.NewMux(ln)var lHttp, lHttps net.ListenerlHttp = muxer.ListenHttp(1)httpServer := *MyServer{lHttp, HandleHttp}lHttps = muxer.ListenHttps(2)httpsServer := *MyServer{lHttps, HandleHttps}go httpServer.Server()go httpsServer.Server()err = muxer.Serve()
}
转载于:https://www.cnblogs.com/MnCu8261/p/10639897.html
frp源码剖析-frp中的mux模块相关推荐
- frp源码剖析-frp中的log模块
前言&引入 一个好的log模块可以帮助我们排错,分析,统计 一般来说log中需要有时间.栈信息(比如说文件名行号等),这些东西一般某些底层log模块已经帮我们做好了.但在业务中还有很多我们需要 ...
- Swoft 源码剖析 - Swoft 中的注解机制
作者:bromine 链接:https://www.jianshu.com/p/ef7... 來源:简书 著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版. Swoft Github ...
- python字符串代码对象_Python源码剖析 - Python中的字符串对象
1. 前言 我们已经在 [Python中的整数对象] 章节中对定长对象进行了详细的讲解,接下来我们将介绍变长对象,而字符串类型,则是这类对象的典型代表. 这里必须先引入一个概念: Python 中的变 ...
- python源码剖析—— python中的列表对象
1. PyListObject对象 PyListObject 对象可以有效地支持插入,添加,删除等操作,在 Python 的列表中,无一例外地存放的都是 PyObject 的指针.所以实际上,你可以这 ...
- python源码剖析—— python中的字节码对象初探
一.代码对象 每个初学python的人都会认为python是一种解释型语言,这个不能说错.但是python并不是真的对执行的python代码的每一行进行解释,虽然我们有一个所谓的"解释器&q ...
- 从源码剖析SpringBoot中Tomcat的默认最大连接数
为什么你的websocket只能建立256个连接?推出后,有许多小伙伴问:关键是怎么解决256这个问题.嗯,可能是我的标题起的有点问题,不过如果有认真阅读文章的话,应该会知道,其实256的限制是Chr ...
- 源码剖析Redis中如何使用跳表的
前言 阿里云今年春招校招面试题,面试官问Redis在是如何使用跳表的?让很多同学赶到很头疼.今天我们就来讲一讲吧. Sorted Set的结构 redis的数据类型中有序集合(sorted set)使 ...
- STL源码剖析 5中迭代器型别
最常使用的5种迭代器的型别 为 value_type.difference_type.pointer.reference.iterator_category. 如果想要自己开发的容器和STL进行适配, ...
- Redis源码剖析(六)事务模块
Redis允许客户端开启事务模式,在事务模式中,客户端输入的命令不会立即执行而是被保存在事务队列中,只有当客户端输入事务运行命令时,Redis才会将事务队列中的所有命令按照FIFO的顺序一个个执行 一 ...
最新文章
- 不要62 ---数位DP
- 多对多的属性对应表如何做按照类别的多属性匹配搜索
- CentOS查看CPU、内存、版本等系统信息
- Ext JS 4.1 RC2 Released发布
- 美团架构师写的Java面试宝典_2019最新美团java面试题及答案
- 《此生未完成》痛句摘抄(3)
- cmd命令操作Oracle数据库
- WPF——Expander控件(转)
- Intel 64/x86_64/IA-32/x86处理器 - SIMD指令集 - MMX技术(4) - 比较指令
- 系统更新链接服务器超时,win10系统更新导致Dr.com连接认证服务器超时的解决方法...
- PHP5.2\5.3 Xdebug 调试器配置及应用
- PDF文档阅读必备的PDF阅读器
- 百度SEO站群PTCMS全自动采集小说网站源码
- PS如何做文字扫描效果(用于较淡的扫描件)
- 《工程学导论》读书笔记-2
- ​SIGIR 2022 | 港大、武大提出KGCL:基于知识图谱对比学习的推荐系统
- Flutter APNS device token not set before retrieving FCM Token for Sender ID
- 计算机网络地址块例题,计算机网络习题计算机络习题.ppt
- python要学什么英文歌_Python分析网易云音乐近5年热门歌单
- [Linux]命令查找一个文件