From e5e8c1c19fab34db1c05152ce85b95be3f07c42b Mon Sep 17 00:00:00 2001 From: ayflying Date: Tue, 14 Oct 2025 21:44:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E5=AE=8C=E6=88=90=E5=86=85?= =?UTF-8?q?=E7=BD=91=E8=BF=9E=E6=8E=A5=EF=BC=8C=E5=A4=96=E7=BD=91=E6=89=93?= =?UTF-8?q?=E6=B4=9E=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.MD | 124 +++++++++++++++++++++++++++++++++- internal/logic/p2p/client.go | 56 ++++++++------- internal/logic/p2p/gateway.go | 10 +-- 3 files changed, 158 insertions(+), 32 deletions(-) diff --git a/README.MD b/README.MD index d36cedd..07585ec 100644 --- a/README.MD +++ b/README.MD @@ -1,4 +1,122 @@ -# GoFrame Template For SingleRepo +# P2P 通信系统 -Quick Start: -- https://goframe.org/quick \ No newline at end of file +一个基于 Go 语言和 libp2p 构建的对等网络通信系统,提供可靠的分布式节点间通信功能。 + +## 功能特点 + +- 基于 libp2p 的去中心化 P2P 网络通信 +- GoFrame 框架支持的高效 Web 服务 +- Redis 缓存集成 +- 模块化架构设计 +- 命令行工具支持 + +## 技术栈 + +- **Go 语言**: 1.24.0 +- **框架**: GoFrame v2.9.3 +- **P2P 库**: libp2p v0.43.0 +- **存储**: Redis +- **多地址格式**: multiaddr v0.16.0 + +## 安装要求 + +- Go 1.24.0 或更高版本 +- Redis 服务器 +- Git + +## 快速开始 + +### 克隆项目 + +```bash +git clone https://github.com/ayflying/p2p.git +cd p2p +``` + +### 安装依赖 + +```bash +go mod tidy +``` + +### 构建项目 + +```bash +go build -o p2p +``` + +### 运行项目 + +```bash +./p2p +``` + +## 目录结构 + +``` +├── api/ # API 定义 +│ └── p2p/ # P2P 相关 API +├── internal/ # 内部代码 +│ ├── cmd/ # 命令行工具 +│ ├── consts/ # 常量定义 +│ ├── controller/ # 控制器 +│ ├── logic/ # 业务逻辑 +│ ├── model/ # 数据模型 +│ ├── packed/ # 打包文件 +│ └── service/ # 服务层 +├── main.go # 程序入口 +├── go.mod # Go 模块定义 +├── go.sum # 依赖锁定文件 +├── Makefile # 构建脚本 +└── README.MD # 项目文档 +``` + +## 核心模块 + +### P2P 通信 + +提供基于 libp2p 的节点发现、连接和通信功能,支持分布式网络中的节点间数据传输。 + +### Web 服务 + +基于 GoFrame 框架构建的 Web 服务,提供 RESTful API 接口。 + +### 缓存系统 + +集成 Redis 缓存,提高系统性能和数据访问效率。 + +## 命令行工具 + +项目提供了命令行工具,支持节点管理、网络状态查询等功能。 + +```bash +./p2p --help # 查看帮助信息 +``` + +## 开发指南 + +### 代码规范 + +- 遵循 Go 语言标准编码规范 +- 保持代码简洁、可读 +- 添加适当的注释 + +### 提交代码 + +1. Fork 项目仓库 +2. 创建功能分支 +3. 提交代码变更 +4. 创建 Pull Request + +## 许可证 + +[MIT License](https://opensource.org/licenses/MIT) + +## 联系方式 + +如有问题或建议,请通过以下方式联系我们: +- GitHub Issues: https://github.com/ayflying/p2p/issues + +--- + +*最后更新时间: 2024-07-15* \ No newline at end of file diff --git a/internal/logic/p2p/client.go b/internal/logic/p2p/client.go index 922026f..95b3345 100644 --- a/internal/logic/p2p/client.go +++ b/internal/logic/p2p/client.go @@ -10,6 +10,7 @@ import ( "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/gtimer" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/libp2p/go-libp2p" @@ -17,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" ) @@ -48,17 +48,13 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) { peers: make(map[string]peer.ID), } + g.Log().Debugf(ctx, "当前p2p的分享地址:%v", hostObj.Addrs()) // 设置流处理函数(处理P2P消息) hostObj.SetStreamHandler(ProtocolID, s.handleStream) // 连接网关(WebSocket) if err = s.connectGateway(); err != nil { - glog.Fatalf(ctx, "连接网关失败: %v", err) - } - - // 注册到网关 - if err = s.register(); err != nil { - glog.Fatalf(ctx, "注册到网关失败: %v", err) + g.Log().Fatalf(ctx, "连接网关失败: %v", err) } // 启动网关消息接收协程 @@ -76,14 +72,15 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) { // 创建libp2p主机 func (s *sP2P) createLibp2pHost(ctx context.Context, port int) (host.Host, error) { // 配置监听地址 - listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port) + //listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port) // 创建主机 h, err := libp2p.New( - libp2p.ListenAddrStrings(listenAddr), - libp2p.DefaultTransports, - libp2p.DefaultMuxers, - libp2p.DefaultSecurity, + ////libp2p.ListenAddrStrings(listenAddr), + //libp2p.ListenAddrs(), + //libp2p.DefaultTransports, + //libp2p.DefaultMuxers, + //libp2p.DefaultSecurity, ) return h, err } @@ -97,6 +94,11 @@ func (s *sP2P) connectGateway() (err error) { //defer conn.Close() s.client.wsConn = conn + + // 注册到网关 + if err = s.register(); err != nil { + g.Log().Fatalf(ctx, "注册到网关失败: %v", err) + } return } @@ -164,7 +166,7 @@ func (s *sP2P) SendData(targetID string, data []byte) error { } // 创建流 - stream, err := s.client.host.NewStream(gctx.New(), peerID, protocol.ID("/p2p-chat/1.0.0")) + stream, err := s.client.host.NewStream(gctx.New(), peerID, ProtocolID) if err != nil { return err } @@ -242,6 +244,11 @@ func (s *sP2P) receiveGatewayMessages() { _, data, err := s.client.wsConn.ReadMessage() if err != nil { glog.Errorf(ctx, "接收网关消息失败: %v", err) + + gtimer.SetTimeout(ctx, 5*time.Second, func(ctx context.Context) { + err = s.connectGateway() + return + }) return } @@ -251,22 +258,23 @@ func (s *sP2P) receiveGatewayMessages() { continue } - // 验证消息是否发给自己(to必须是当前客户端ID或空) - if msg.To != "" && msg.To != s.client.Id { - g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To) - continue - } + //// 验证消息是否发给自己(to必须是当前客户端ID或空) + //if msg.To != "" && msg.To != s.client.Id { + // g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To) + // continue + //} // 处理不同类型消息 switch msg.Type { case MsgTypeRegisterAck: - glog.Infof(ctx, "注册成功") + g.Log().Infof(ctx, "注册成功") case MsgTypeDiscoverAck: var msgData struct { - Found bool `json:"found"` - PeerID string `json:"peer_id,omitempty"` - Addrs []string `json:"addrs,omitempty"` + Found bool `json:"found"` + PeerID string `json:"peer_id,omitempty"` + Addrs []string `json:"addrs,omitempty"` + TargetID string `json:"target_id"` } if err = gjson.DecodeTo(msg.Data, &msgData); err != nil { g.Log().Errorf(ctx, "解析发现响应失败: %v", err) @@ -274,7 +282,7 @@ func (s *sP2P) receiveGatewayMessages() { } if !msgData.Found { - fmt.Println("未找到目标节点") + fmt.Println("gateway未找到目标节点") continue } @@ -310,7 +318,7 @@ func (s *sP2P) receiveGatewayMessages() { g.Log().Infof(ctx, "成功连接到目标节点:%v", targetID) s.client.peers[targetID] = targetPeerID - }(peerID, msg.To) + }(peerID, msgData.TargetID) case MsgTypePunchRequest: err = s.handlePunchRequest(msg.Data) diff --git a/internal/logic/p2p/gateway.go b/internal/logic/p2p/gateway.go index d57d4a6..31fba5c 100644 --- a/internal/logic/p2p/gateway.go +++ b/internal/logic/p2p/gateway.go @@ -217,7 +217,7 @@ func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) { fromClient.LastActive = time.Now() s.lock.Unlock() - if targetClient != nil { + if targetClient == nil { // 目标不存在 s.sendMessage(conn, GatewayMessage{ Type: MsgTypeDiscoverAck, @@ -237,10 +237,10 @@ func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) { From: "gateway", // 发送方是网关 To: msg.From, // 接收方是原请求方 Data: gjson.MustEncode(g.Map{ - "found": true, - "peer_id": targetClient.PeerID, - //"addrs": s.getAddrsJSON(targetClient.Addrs), - "addrs": targetClient.Addrs, + "found": true, + "peer_id": targetClient.PeerID, + "addrs": targetClient.Addrs, + "target_id": data.TargetID, }), })