P2P网络编程-2-案例实践:P2P聊天应用
文章目录
- 一、初代版本
- 1.1 简介
- 1.2 代码与解析
- 1.3 测试运行
- 二、节点发现
- 2.1 简介
- 2.2 代码与解析
- 2.3 测试运行
- 三、总结
- 3.1 libp2p节点发现构建流程
- 3.2 libp2p中地址的转换关系
上一节学习了IPFS项目中P2P网络的构建工具libp2p的基本概念以及简单案例
接下来通过官方的聊天案例进一步了解该项目的使用
项目地址: https://github.com/libp2p/go-libp2p/tree/master/examples
我们从初代版本(手动发送接收)p2p聊天到具有节点发现功能的完全去中心化聊天学习libp2p的使用
一、初代版本
1.1 简介
项目使用Libp2p实现了一个简单的p2p聊天应用。该项目应用在两个节点中,需要至少满足下列条件一个:
- 都有一个私有IP地址(在同一个网络下)
- 至少有一个节点有一个公网地址
1.2 代码与解析
已将所有的注解写在代码中,可直接查看代码p2pChat.go
package mainimport ("bufio""context""crypto/rand""flag""fmt""github.com/libp2p/go-libp2p""github.com/libp2p/go-libp2p-core/crypto""github.com/libp2p/go-libp2p-core/host""github.com/libp2p/go-libp2p-core/network""github.com/libp2p/go-libp2p-core/peer""github.com/libp2p/go-libp2p-core/peerstore"mutiaddr "github.com/multiformats/go-multiaddr""io""log"mrand "math/rand""os"
)//
// handleStream
// @Description: 流处理函数
// @param s 新的数据流
//
func handleStream(s network.Stream) {log.Println("获得了新的流!")// 根据新的流创建读写流rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))// 创建两个携程分别读写数据go readData(rw)go writeData(rw)// 流s始终开启,直到流两端的任何一方关闭他
}//
// readData
// @Description: 按行读取字符串输出
// @param rw
//
func readData(rw *bufio.ReadWriter) {for {s, _ := rw.ReadString('\n')if s == "" {return}if s != "\n" {fmt.Printf("\x1b[32m%s\x1b[0m>", s)}}
}//
// writeData
// @Description: 获取标准输入,按行写入数据
// @param rw
//
func writeData(rw *bufio.ReadWriter) {// 1. 创建标准输入流stdReader := bufio.NewReader(os.Stdin)for {fmt.Printf(">")// 2. 读取标准输入s, err := stdReader.ReadString('\n')if err != nil {log.Panic(err)return}// 3. 使用读写流进行写入rw.WriteString(fmt.Sprintf("%s\n", s))rw.Flush()}
}//
// makeHost
// @Description: 生成一个节点主机
// @param ctx 上下文环境
// @param port 端口
// @param randomness 随机源
// @return host.Host
// @return error
//
func makeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) {// 1. 使用RSA和随机数流创建密钥对privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness)if err != nil {log.Panic(err)return nil, err}// 2. 根据端口构建多重地址newMultiaddr, err := mutiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))// 3. 创建节点return libp2p.New(ctx,libp2p.ListenAddrs(newMultiaddr), // 设置地址libp2p.Identity(privKey), // 设置密钥)
}//
// startPeer
// @Description: 节点被连接时开启流处理协议,仅被使用在流接受端
// @param ctx
// @param host
// @param streamHandler
//
func startPeer(host host.Host, streamHandler network.StreamHandler) {// 1. 设置流处理函数host.SetStreamHandler("/chat/1.0.0", streamHandler)// 2. 获取实际节点的TCP端口var port stringfor _, la := range host.Network().ListenAddresses() {// 获取当前节点Tcp协议监听的端口号// ValueForProtocol作用:获取mutiAddr中特殊协议的特殊值if tcpPort, err := la.ValueForProtocol(mutiaddr.P_TCP); err == nil {port = tcpPortbreak}}// 3. 输出if port == "" {log.Println("不能够找到真实本地端口")return}// host.ID().Pretty(): 返回节点ID的base58编码log.Printf("运行 './p2pChat -d /ip4/127.0.0.1/tcp/%v/p2p/%s' 在另一个控制台", port, host.ID().Pretty())log.Println("你可以用公网IP代替127.0.0.1")log.Println("等待连接中...")log.Println()
}//
// startPeerAndConnect
// @Description: 链接目标节点并创建流
// @param ctx 上下文
// @param host 主机节点
// @param destination 目标节点字符串
// @return *bufio.ReadWriter 缓冲读写流
// @return error
//
func startPeerAndConnect(host host.Host, destination string) (*bufio.ReadWriter, error) {// 输出一下主机地址信息for _, la := range host.Addrs() {log.Printf(" - %v\n", la)}log.Println()// 1. 将目标节点地址转换为mutiaddr格式desMultiaddr, err := mutiaddr.NewMultiaddr(destination)if err != nil {log.Panic(err)return nil, err}// 2. 提取目标节点的Peer ID信息info, err := peer.AddrInfoFromP2pAddr(desMultiaddr)if err != nil {log.Panic(err)return nil, err}// 3. 将目标节点的信息(id和地址)加入当前主机节点的节点池(后面的创建链接、流都需要使用)host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)// 4. 向目标节点开启流// context.Background(): 是一个空环境,一般用于初始化、测试newStream, err := host.NewStream(context.Background(), info.ID, "/chat/1.0.0")if err != nil {log.Panic(err)return nil, err}log.Println("已建立目标节点的连接")// 5. 根据新的流创建缓冲读写流返回rw := bufio.NewReadWriter(bufio.NewReader(newStream), bufio.NewWriter(newStream))return rw, nil
}func main() {// 1. 创建程序上下文环境ctx, cancel := context.WithCancel(context.Background())defer cancel()// 2. 命令行程序逻辑sourcePort := flag.Int("sp", 0, "源端口号")dest := flag.String("d", "", "目标地址字符串")help := flag.Bool("h", false, "帮助")debug := flag.Bool("debug", false, "Debug模式:每次执行都生成相同的node ID")flag.Parse() // 解析输入命令行// 帮助if *help {fmt.Printf("这是一个使用libp2p编写的简单P2P聊天程序\n\n")fmt.Println("Usage: Run './p2pChat -sp <SOURCE_PORT>' where <SOURCE_PORT> can be any port number.")fmt.Println("Now run './p2pChat -d <MULTIADDR>' where <MULTIADDR> is multiaddress of previous listener host.")os.Exit(0)}// debug模式var r io.Readerif *debug {// 创建公私钥的时候使用相同的随机源(sourcePort)使得每次生成相同的Peer ID,不要在正式环境使用// 注意:mrand是math/rand,下面的rand是crypto/randr = mrand.New(mrand.NewSource(int64(*sourcePort)))}else {r = rand.Reader}// 创建主机h, err := makeHost(ctx, *sourcePort, r)if err != nil {log.Panic(err)return}// 开启主机if *dest == "" {// 接收连接方startPeer(h, handleStream) // 将自己写的流处理函数导入}else {// 发送连接方rw, err := startPeerAndConnect(h, *dest)if err != nil {log.Panic(err)return}// 创建携程读写go readData(rw)go writeData(rw)}// 一直阻塞select {}
}
1.3 测试运行
我的环境:
- 一个公网IP服务器:47.103.203.133
- 一个本地电脑
首先启动连接接受者(公网服务器)端口1234:
go build p2pChat.go
./p2pChat -sp 1234
然后启动连接发起者本地电脑,注意:将127.0.0.1换成服务器IP地址:
./p2pChat -d /ip4/47.103.203.133/tcp/1234/p2p/QmSz6C8oMoNEAUJyQkBFbPXLZTGgRpmT85npqtYGAr9Vyb
如此,两者就可以通话了~!
二、节点发现
2.1 简介
上面的方法需要提前了解节点的地址才能够连接上,真正的p2p需要实现节点的自动发现
以下两个步骤在上面的初代版本中已经实现:
- 创建上下文环境
- 配置一个p2p host节点
- 设置本地流默认处理函数
现在节点发现需要实现以下步骤:
- 初始化一个新的DHTDHTDHT客户端与host主机对等
- 连接IPFS的引导节点(使用DHT发现附近的引导节点)
- 使用相同的键申明同一p2p节点网络(在聚会前约好地点)
- 寻找附近的对等点
- 向附近对等点开启stream
2.2 代码与解析
p2pChat.go
文件和详解如下:
package mainimport ("bufio""context""flag""fmt""github.com/ipfs/go-log/v2""github.com/libp2p/go-libp2p""github.com/libp2p/go-libp2p-core/network""github.com/libp2p/go-libp2p-core/peer""github.com/libp2p/go-libp2p-core/protocol"discovery "github.com/libp2p/go-libp2p-discovery"dht "github.com/libp2p/go-libp2p-kad-dht""github.com/multiformats/go-multiaddr""os""sync"
)// 重制一个事件名称
var logger = log.Logger("rendezvous")//
// handleStream
// @Description: 流处理函数
// @param s 新的数据流
//
func handleStream(s network.Stream) {logger.Info("获得了新的流!")// 根据新的流创建读写流rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))// 创建两个携程分别读写数据go readData(rw)go writeData(rw)// 流s始终开启,直到流两端的任何一方关闭他
}//
// readData
// @Description: 按行读取字符串输出
// @param rw
//
func readData(rw *bufio.ReadWriter) {for {s, _ := rw.ReadString('\n')if s == "" {return}if s != "\n" {fmt.Printf("\x1b[32m%s\x1b[0m>", s)}}
}//
// writeData
// @Description: 获取标准输入,按行写入数据
// @param rw
//
func writeData(rw *bufio.ReadWriter) {// 1. 创建标准输入流stdReader := bufio.NewReader(os.Stdin)for {fmt.Printf(">")// 2. 读取标准输入s, err := stdReader.ReadString('\n')if err != nil {panic(err)}// 3. 使用读写流进行写入rw.WriteString(fmt.Sprintf("%s\n", s))rw.Flush()}
}func main() {// 1.设置log日志输出等级log.SetAllLoggers(log.LevelWarn)log.SetLogLevel("rendezvous", "info")// 2. 命令行help := flag.Bool("h", false, "帮助")// 解析config, err := ParseFlags()if err != nil {panic(err)}// 帮助if *help {fmt.Println("This program demonstrates a simple p2p chat application using libp2p")fmt.Println()fmt.Println("Usage: Run './p2pChat in two different terminals. Let them connect to the bootstrap nodes, announce themselves and connect to the peers")flag.PrintDefaults()return}// 3. 创建上下文ctx, cancel := context.WithCancel(context.Background())defer cancel()// 4. 创建本地节点host, err := libp2p.New(ctx,libp2p.ListenAddrs([]multiaddr.Multiaddr(config.ListenAddresses)...), // 如果配置了监听节点就初始化加入)if err != nil {panic(err)}logger.Info("创建本地节点: ", host.ID())logger.Info(host.Addrs())// 5. 开启本地节点流处理函数host.SetStreamHandler(protocol.ID(config.ProtocolID), handleStream)// 6. 初始化DHT客户端kademliaDHT, err := dht.New(ctx, host)if err != nil {panic(err)}// 7. 引导DHT客户端,让本地节点维护自己的DHT,这样就可以在未来没有引导节点时加入新的节点logger.Debug("引导DHT中...")if err := kademliaDHT.Bootstrap(ctx); err != nil {panic(err)}// 8. 连接引导节点var wg sync.WaitGroup // 创建多携程等待池for _, peerAddr := range config.BootstrapPeers {// 转换地址格式: mutiaddr -> AddrInfopeerInfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)wg.Add(1) // 加入携程go func() {defer wg.Done() // 预先关闭携程// 连接任务if err := host.Connect(ctx, *peerInfo); err != nil {logger.Warn(err)}else {logger.Info("成功连接引导节点: ", peerInfo.ID)}}()}wg.Wait() // 阻塞等待所有携程结束// 9. 使用相同的键申明同一p2p节点网络(在聚会前约好地点)logger.Info("正在声明当前网络...")// RoutingDiscovery是内容路由的一个实例routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)// 持续的申明一个服务并用一个键唯一标记discovery.Advertise(ctx, routingDiscovery, config.RendezvousString)logger.Info("成功申明网络!唯一标识符:", config.RendezvousString)// 10. 在申明的网络中寻找其他节点logger.Info("正在寻找其他节点")peerChan, err := routingDiscovery.FindPeers(ctx, config.RendezvousString) // 返回的是一个AddrInfo Channelif err != nil {panic(err)}// 遍历for peer := range peerChan {if peer.ID == host.ID() {continue}logger.Debug("找到新节点: ", peer.ID)logger.Debug("开始连接新节点: ", peer.ID)stream, err := host.NewStream(ctx, peer.ID, protocol.ID(config.ProtocolID))if err != nil {logger.Warn("连接节点 ", peer.ID, " 失败!")logger.Warn(err)continue}else {// 成功连接,创建读写流rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))go writeData(rw)go readData(rw)}logger.Info("成功连接节点: ", peer.ID)}select {}
}
flag.go
文件如下:
package mainimport ("flag"dht "github.com/libp2p/go-libp2p-kad-dht"maddr "github.com/multiformats/go-multiaddr""strings"
)// 重命名多地址数组类型
type addrList []maddr.Multiaddr// 将flag命令输入数据存储在自定义结构体中需要实现Value接口,需要实现两个抽象函数String和Set
func (al *addrList) String() string {strs := make([]string, len(*al))for i, addr := range *al {strs[i] = addr.String()}return strings.Join(strs, ",")
}func (al *addrList) Set(value string) error {addr, err := maddr.NewMultiaddr(value)if err != nil {return err}*al = append(*al, addr)return nil
}func StringsToAddrs(addrStrings []string) (maddrs []maddr.Multiaddr, err error) {for _, addrString := range addrStrings {addr, err := maddr.NewMultiaddr(addrString)if err != nil {return maddrs, err}maddrs = append(maddrs, addr)}return
}// 本地节点配置环境(代替数据库)
type Config struct {RendezvousString stringBootstrapPeers addrList // 引导节点集合ListenAddresses addrList // 监听节点集合ProtocolID string
}func ParseFlags() (Config, error){config := Config{}flag.StringVar(&config.RendezvousString, "rendezvous", "19021902","标识一组节点的唯一字符串。与你的朋友分享这些,让他们与你联系")flag.Var(&config.BootstrapPeers, "peer", "向当前节点添加一组引导节点数组(mutiaddress格式)")flag.Var(&config.ListenAddresses, "listen", "向当前节点添加一组监听节点数组(mutiaddress格式)")flag.StringVar(&config.ProtocolID, "pid", "/p2pChat/1.1.0", "给stram Hander设置一个协议号")flag.Parse()if len(config.BootstrapPeers) == 0 {// 如果没有设置引导节点,那么就使用dht默认的引导节点集合config.BootstrapPeers = dht.DefaultBootstrapPeers}return config, nil
}
2.3 测试运行
go build -o p2pChat ./
./p2pChat -listen /ip4/0.0.0.0/tcp/8888
# 另一台机器
./p2pChat -listen /ip4/0.0.0.0/tcp/6666
三、总结
3.1 libp2p节点发现构建流程
3.2 libp2p中地址的转换关系
学习资料来源:
https://github.com/libp2p/go-libp2p/tree/master/examples/chat
https://github.com/libp2p/go-libp2p/tree/master/examples/chat-with-rendezvous
觉得不错的话,请点赞关注呦~~你的关注就是博主的动力
关注公众号,查看更多go开发、密码学和区块链科研内容:
P2P网络编程-2-案例实践:P2P聊天应用相关推荐
- P2P网络编程-3-案例实践:PubSub
libp2p网络通信中还有一种方式就是PubSub模式,也称订阅发布的模式,官方给出了订阅发布模式的一个案例=> 聊天室 在此学习记录一下 官方代码地址:https://github.com/l ...
- Java网络编程学习——简单模拟在线聊天
Java网络编程学习--简单模拟在线聊天 学了java网络,也是该做个小案例来巩固一下了. 本次案例将使用UDP和多线程模拟即时聊天,简单练练手. 1.前提知识 需要知道简单的IO流操作,以及简单的U ...
- 【区块链实战】什么是 P2P 网络,区块链和 P2P 网络有什么关系
目录 一.简介 二.知识点 P2P 网络 区块链节点与 P2P 的关系 区块链节点功能分类 P2P 网络特征 三.什么是 P2P 网络,区块链式使用 P2P 网络做什么 1.P2P 网络概念 2.P2 ...
- Visual C++网络编程经典案例详解 第9章 实用播放器 数据读取与播放控制 识别数据文件信息
识别数据文件信息主要是指对mp3数据格式识别 定义顺序代码如下 typedef struct mp3_struct //自定义mp3结构体 {char heade[3]; //tag字符标记char ...
- Java的网络编程【TCP与UDP聊天小程序】
Java的网络编程[TCP与UDP聊天小程序] 1. TCP协议 1.1 传输控制协议(Transmission Control Protocol),是一种**面向连接(全程保持连接)**的协议,类似 ...
- c++网络编程:实现简单的聊天——基于服务器/客户端的模式
c++ socket编程 服务器/客户端 最近在学c++网路编程,写个帖子记录自己的学习过程,部分内容参考一个博主的,附上他的链接: link 实现: 客户端往服务器端发送一条数据,服务器端接收数据并 ...
- 【网络编程】用Socket实现聊天小程序
客户端: 1 package day18_网络编程; 2 3 import java.io.FileInputStream; 4 import java.io.IOException; 5 impor ...
- 史上最简单的spark教程第十三章-SparkSQL编程Java案例实践(终章)
Spark-SQL的Java实践案例(五) 本章核心:JDBC 连接外部数据库,sparkSQL优化,故障监测 史上最简单的spark教程 所有代码示例地址:https://github.com/My ...
- C#网络编程的最佳实践
网络框架的选择 C++语言里面有asio和libuv等网络库, 可以方便的进行各种高效编程. 但是C#里面, 情况不太一样, C#自带的网络API有多种. 例如: Socket TcpStream(同 ...
最新文章
- 【数据结构】顺序表的应用(4)(C语言)
- redis 的 HyperLogLog
- 用户被忽悠 微软黑屏计划推至21日0点实施
- 密度图的密度估计_基于核密度的宝鸡地名文化特征与时空分布研究
- 开机启动项_Windows10开机速度:其实手动设置可以更快!
- 计算机网络的拓扑结构主要取决于它的( )
- Python学习秘籍 这些窍门就连老司机都不一定知道 值得学习
- java 数据类型及作用域、数据类型转换、运算符、流程控制
- 何时不应该使用 Rails?
- 2021牛客寒假算法基础集训营1,签到题ABFIJ
- 报告节选2:桌面虚拟化需重视网络和存储
- php提示Undefined index的解决方法
- miniprogrampatch 提供 watch 和 computed 特性
- 布线问题—分支限界—java实现
- bcnf分解算法_BCNF的保持无损连接的分解
- 东南亚外卖平台分析报告
- php递归算法计算n 介乘,递归算法示例——计算N的阶乘
- 增强现实(AR)智能眼镜的关键技术:标定、跟踪与交互
- 大学生程序设计邀请赛(华东师范大学)A
- OSI七层模型与TCP/IP四层模型详解
热门文章
- Android中测试多国语言漏翻的情况(一)
- win10玩cf不能全屏_80后经典游戏:红色警戒2(可支持win10系统)
- 河北省电子工程高级职称公示_2019年石家庄职称评审,电子工程通过名单!
- 美国登月,支持者与反对者有何差异
- 相关性的基本概念「AI基础」
- Quarters II报Verilog语法near text “ã“; expecting a direction错误
- Vue中手动导出Element表格为pdf/word/excel格式
- Prometheus监控入门级使用教程
- 智能眼镜产业现状及开发者的新机遇
- 机器人关节伺服电机三环控制方式