From f4bf11c1364819dda54c563ce120ad774758235c Mon Sep 17 00:00:00 2001 From: ayflying Date: Tue, 21 Oct 2025 18:47:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=B3=E9=97=AD=E5=85=AC=E5=85=B1=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E7=9A=84=E9=93=BE=E6=8E=A5=E6=96=B9=E5=BC=8F=EF=BC=8C?= =?UTF-8?q?=E5=8F=AA=E8=BF=9E=E6=8E=A5=E6=9C=AC=E5=9C=B0=E7=A7=81=E6=9C=89?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmd/cmd.go | 99 ++++++++++------- internal/cmd/dht.go | 6 +- internal/cmd/p2p.go | 28 +++-- internal/logic/p2p/client.go | 15 +-- internal/logic/p2p/dht.go | 194 ++++++++++++++++++++++------------ internal/logic/p2p/gateway.go | 8 +- internal/logic/p2p/p2p.go | 5 +- internal/service/p_2_p.go | 4 +- 8 files changed, 222 insertions(+), 137 deletions(-) diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 1b89ce5..3af272f 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -2,15 +2,13 @@ package cmd import ( "context" + "fmt" "time" - "github.com/ayflying/p2p/internal/controller/p2p" "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/gcmd" "github.com/gogf/gf/v2/os/gctx" - "github.com/gogf/gf/v2/os/gtimer" ) func init() { @@ -32,47 +30,68 @@ var ( g.Log().Debug(ctx, "开始执行main") parser, err = gcmd.Parse(g.MapStrBool{ - "w,ws": true, - "g,gateway": true, - "p,port": true, + "p,port": true, }) - addr := g.Cfg().MustGet(ctx, "ws.address").String() - ws := parser.GetOpt("ws", addr).String() - //port := parser.GetOpt("port", 0).Int() + port := parser.GetOpt("port", "23333").Int() - s.Group("/", func(group *ghttp.RouterGroup) { - group.Middleware(ghttp.MiddlewareHandlerResponse) - group.Bind( - p2p.NewV1(), - ) - }) + h, _ := service.P2P().CreateLibp2pHost(gctx.New(), port) + err = service.P2P().DHTStart(h, nil) + if err != nil { + g.Log().Error(ctx, err) + } - //启动p2p服务端网关 - s.Group("/ws", func(group *ghttp.RouterGroup) { - group.Middleware(ghttp.MiddlewareHandlerResponse) - err = service.P2P().GatewayStart(ctx, group) - if err != nil { - g.Log().Error(ctx, err) - } - }) + time.Sleep(5 * time.Second) + publicIp, _ := service.P2P().GetIPv4PublicIP() + validKey := fmt.Sprintf("%v/ip", h.ID()) + dataValue := fmt.Sprintf("来自节点 %s 的数据:%v", h.ID().ShortString(), publicIp) + if err = service.P2P().StoreToDHT(gctx.New(), validKey, dataValue); err != nil { + g.Log().Debugf(ctx, "❌ 存储失败: %v\n", err) + } else { + g.Log().Debugf(ctx, "✅ 存储成功\nKey: %s\nValue: %s\n", validKey, dataValue) + } - //s.SetPort(port) - - // 延迟启动 - gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { - g.Log().Debug(ctx, "开始执行客户端") - // 启动p2p客户端 - err = service.P2P().Start(ctx, ws) - - g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) - //addrs, _ := net.InterfaceAddrs() - //for _, addr := range addrs { - // if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { - // g.Log().Infof(ctx, "访问地址:http://%v:%d", ipnet.IP.String(), s.GetListenedPort()) - // } - //} - - }) + //parser, err = gcmd.Parse(g.MapStrBool{ + // "w,ws": true, + // "g,gateway": true, + // "p,port": true, + //}) + //addr := g.Cfg().MustGet(ctx, "ws.address").String() + //ws := parser.GetOpt("ws", addr).String() + ////port := parser.GetOpt("port", 0).Int() + // + //s.Group("/", func(group *ghttp.RouterGroup) { + // group.Middleware(ghttp.MiddlewareHandlerResponse) + // group.Bind( + // p2p.NewV1(), + // ) + //}) + // + ////启动p2p服务端网关 + //s.Group("/ws", func(group *ghttp.RouterGroup) { + // group.Middleware(ghttp.MiddlewareHandlerResponse) + // err = service.P2P().GatewayStart(ctx, group) + // if err != nil { + // g.Log().Error(ctx, err) + // } + //}) + // + ////s.SetPort(port) + // + //// 延迟启动 + //gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { + // g.Log().Debug(ctx, "开始执行客户端") + // // 启动p2p客户端 + // err = service.P2P().Start(ws) + // + // g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) + // //addrs, _ := net.InterfaceAddrs() + // //for _, addr := range addrs { + // // if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { + // // g.Log().Infof(ctx, "访问地址:http://%v:%d", ipnet.IP.String(), s.GetListenedPort()) + // // } + // //} + // + //}) s.Run() return nil diff --git a/internal/cmd/dht.go b/internal/cmd/dht.go index 41c6eea..82e9b98 100644 --- a/internal/cmd/dht.go +++ b/internal/cmd/dht.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "time" "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/frame/g" @@ -28,15 +29,16 @@ var ( parser, err = gcmd.Parse(g.MapStrBool{ "p,port": true, }) - port := parser.GetOpt("port", "0").Int() + port := parser.GetOpt("port", "23333").Int() h, _ := service.P2P().CreateLibp2pHost(ctx, port) - err = service.P2P().DHTStart(ctx, h, nil) + err = service.P2P().DHTStart(h, nil) if err != nil { g.Log().Error(ctx, err) } go func() { + time.Sleep(5 * time.Second) publicIp, _ := service.P2P().GetIPv4PublicIP() validKey := fmt.Sprintf("%v/ip", h.ID()) dataValue := fmt.Sprintf("来自节点 %s 的数据:%v", h.ID().ShortString(), publicIp) diff --git a/internal/cmd/p2p.go b/internal/cmd/p2p.go index 2bdb53b..8c9057f 100644 --- a/internal/cmd/p2p.go +++ b/internal/cmd/p2p.go @@ -77,12 +77,12 @@ var ( //addr := "/ip4/192.168.50.173/tcp/51888/p2p/12D3KooWJKBB9bF9MjqgsFYUUsPBG249FDq7a3ZdaYc9iw8G78JQ" //addrs := "WyIvaXA0LzEyNy4wLjAuMS90Y3AvNTE4ODgiLCIvaXA0LzE5Mi4xNjguNTAuMTczL3RjcC81MTg4OCJd" wsStr := "ws://192.168.50.173:51888/ws" - err = service.P2P().Start(ctx, wsStr) + err = service.P2P().Start(wsStr) case "dht": g.Log().Debug(ctx, "开始执行dht") - h, _ := service.P2P().CreateLibp2pHost(ctx, 0) + h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) - err := service.P2P().DHTStart(ctx, h, nil) + err := service.P2P().DHTStart(h, nil) if err != nil { g.Log().Error(ctx, err) } @@ -98,17 +98,23 @@ var ( case "dht2": g.Log().Debug(ctx, "开始执行dht2") - h, _ := service.P2P().CreateLibp2pHost(ctx, 0) + h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) - addr := []string{ - //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - //"/ip4/114.132.176.115/tcp/23333/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", - //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", - } + //addr := []string{ + // "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // + // "/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // "/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // + // //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + // //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + // //"/ip4/114.132.176.115/tcp/23333/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + // //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + //} id := gcmd.GetOpt("id").String() - err := service.P2P().DHTStart(ctx, h, addr) + err := service.P2P().DHTStart(h, nil) if err != nil { g.Log().Error(ctx, err) } diff --git a/internal/logic/p2p/client.go b/internal/logic/p2p/client.go index c24bbad..bb228ad 100644 --- a/internal/logic/p2p/client.go +++ b/internal/logic/p2p/client.go @@ -71,8 +71,8 @@ func (s *sP2P) linkTcp(addr string) { //defer gtcpConn.Close() } -func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) { - +func (s *sP2P) Start(wsStr string) (err error) { + var ctx = gctx.New() hostObj, err := s.CreateLibp2pHost(ctx, 0) if err != nil { g.Log().Error(ctx, err) @@ -97,7 +97,7 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) { } // 启动网关消息接收协程 - go s.receiveGatewayMessages() + go s.receiveGatewayMessages(ctx) g.Log().Infof(ctx, "已连接网关成功,客户端ID: %s", s.client.Id) //g.Log().Infof(ctx,"当前地址:http://127.0.0.1/") @@ -134,8 +134,8 @@ func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error libp2p.DefaultSecurity, // 增加NAT端口映射尝试时间 //libp2p.NATPortMapTimeout(30*time.Second), - // 禁用Relay(如果需要中继,可保留) - libp2p.DisableRelay(), + + //libp2p.DisableRelay(), // 禁用Relay(如果需要中继,可保留) libp2p.NATPortMap(), // 自动尝试路由器端口映射(跨网络必备) ) if err != nil { @@ -148,6 +148,7 @@ func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error // 连接网关(WebSocket) func (s *sP2P) connectGateway() (err error) { + var ctx = gctx.New() conn, _, err := websocket.DefaultDialer.Dial(s.client.gatewayURL, nil) if err != nil { gtimer.SetTimeout(ctx, 3*time.Minute, func(ctx context.Context) { @@ -211,6 +212,7 @@ func (s *sP2P) DiscoverAndConnect(targetID string) error { // 处理P2P流 func (s *sP2P) handleStream(stream network.Stream) { + ctx := gctx.New() defer stream.Close() peerID := stream.Conn().RemotePeer().String() @@ -252,6 +254,7 @@ func (s *sP2P) sendData(targetID string, data []byte) error { // 处理网关的打洞请求 func (s *sP2P) handlePunchRequest(data json.RawMessage) error { + ctx := gctx.New() var punchData struct { FromID string `json:"from_id"` PeerID string `json:"peer_id"` @@ -312,7 +315,7 @@ func (s *sP2P) sendToGateway(msg GatewayMessage) (err error) { } // 接收网关消息 -func (s *sP2P) receiveGatewayMessages() { +func (s *sP2P) receiveGatewayMessages(ctx context.Context) { for { _, data, err := s.client.wsConn.ReadMessage() if err != nil { diff --git a/internal/logic/p2p/dht.go b/internal/logic/p2p/dht.go index 0bde3b1..a92865a 100644 --- a/internal/logic/p2p/dht.go +++ b/internal/logic/p2p/dht.go @@ -9,6 +9,7 @@ import ( "github.com/gogf/gf/v2/crypto/gsha1" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gctx" //"github.com/ipfs/boxo/ipns" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -31,9 +32,23 @@ var ( ) // 初始化无服务器DHT(作为节点加入DHT网络) -func (s *sP2P) DHTStart(ctx context.Context, h host.Host, bootstrapPeers []string) (err error) { +func (s *sP2P) DHTStart(h host.Host, bootstrapPeers []string) (err error) { + ctx := gctx.New() + //打印节点地址(供其他节点手动加入时使用) s.printNodeAddrs(h) + + if len(bootstrapPeers) == 0 { + bootstrapPeers = []string{ + "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + + "/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + "/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + } + + } + s.dht.bootstrapPeers = bootstrapPeers // 2. 通过官方 Bootstrap 节点加入公共 DHT 网络(完全去中心化入口) @@ -41,10 +56,10 @@ func (s *sP2P) DHTStart(ctx context.Context, h host.Host, bootstrapPeers []strin if err != nil { log.Fatalf("加入 DHT 网络失败: %v", err) } - fmt.Println("✅ 成功加入完全去中心化 DHT 网络") + fmt.Println("✅ 成功启动完全去中心化 DHT 网络") // 3. 定期打印路由表(观察节点自动发现效果) - go s.printRoutingTable(s.dht.KadDHT, 60*time.Second) + go s.printRoutingTable(ctx, s.dht.KadDHT, 60*time.Second) return } @@ -62,6 +77,11 @@ type NoOpValidator struct{} // Validate 总是返回成功,允许任何数据 func (v *NoOpValidator) Validate(key string, value []byte) error { + g.Log().Debugf(gctx.New(), "当前有数据进行保存:key: %s, value: %s", key, value) + // 限制数据大小(防止超大数据占用资源) + if len(value) > 1024*1024 { // 1MB上限 + return fmt.Errorf("数据超过1MB,拒绝存储") + } return nil } @@ -70,10 +90,29 @@ func (v *NoOpValidator) Select(key string, values [][]byte) (int, error) { return 0, nil } +// 清空 Peerstore 中的所有节点缓存(替代 Clear() 方法) +func (s *sP2P) clearPeerstore(ps peerstore.Peerstore) { + // 1. 获取所有缓存的节点 ID + peers := ps.Peers() + for _, p := range peers { + // 2. 删除该节点的所有地址 + ps.ClearAddrs(p) + // 3. 移除该节点的所有元数据(如协议、密钥等) + ps.RemovePeer(p) + } + fmt.Println("✅ Peerstore 缓存已清空(旧节点地址已删除)") +} + // 加入全球公共 DHT 网络(通过官方 Bootstrap 节点,实现完全去中心化) func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.IpfsDHT, error) { + // 关键:启动后先清空 Peerstore 缓存(删除旧公网节点) + s.clearPeerstore(localHost.Peerstore()) + // 创建 DHT 实例(ModeServer:作为完整节点参与存储和路由) - kadDHT, err := dht.New(ctx, localHost, dht.Mode(dht.ModeServer)) + kadDHT, err := dht.New( + ctx, + localHost, dht.Mode(dht.ModeServer), + ) if err != nil { return nil, err } @@ -85,58 +124,61 @@ func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.Ipf seedPeers, _ := s.parseSeedNodes(s.dht.bootstrapPeers) for _, p := range seedPeers { localHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) - if err = localHost.Connect(ctx, p); err != nil { - fmt.Printf("⚠️ 连接本地种子节点 %s 失败: %v\n", p.ID.ShortString(), err) + // 带超时的连接,确保失败后能释放 + connCtx, connCancel := context.WithTimeout(ctx, 20*time.Second) + err = localHost.Connect(connCtx, p) + if err != nil { + g.Log().Debugf(connCtx, "⚠️ 连接本地种子节点 %s 失败: %v\n", p.ID.ShortString(), err) } else { - fmt.Printf("✅ 连接本地种子节点成功: %s\n", p.ID.ShortString()) + g.Log().Debugf(connCtx, "✅ 连接本地种子节点成功: %s\n", p.ID.ShortString()) } - if err != nil { - fmt.Printf("⚠️ 连接私有节点 %s 失败: %v\n", p.ID.ShortString(), err) - continue - } - fmt.Printf("✅ 连接本地种子节点成功: %s\n", p.ID.ShortString()) - success = true - } - if !success { - return nil, fmt.Errorf("所有本地种子节点连接失败") - } - } else { - - // 连接 libp2p 官方 Bootstrap 节点(仅作为初始入口) - officialBootstrapPeers := dht.DefaultBootstrapPeers // 官方节点列表 - fmt.Println("正在连接官方 Bootstrap 节点(初始入口)...") - - for _, addr := range officialBootstrapPeers { - peerInfo, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - fmt.Printf("⚠️ 解析官方节点失败: %v\n", err) - continue - } - - // 添加节点地址到本地地址簿 - localHost.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL) - - // 尝试连接(超时 10 秒) - connCtx, connCancel := context.WithTimeout(ctx, 10*time.Second) - err = localHost.Connect(connCtx, *peerInfo) connCancel() - - if err != nil { - fmt.Printf("⚠️ 连接官方节点 %s 失败: %v\n", peerInfo.ID.ShortString(), err) - continue - } - fmt.Printf("✅ 连接官方节点成功: %s\n", peerInfo.ID.ShortString()) success = true } - - // 只要连接上至少一个官方节点,即可加入网络(后续会自动发现更多节点) if !success { - return nil, fmt.Errorf("无法连接任何官方 Bootstrap 节点,无法加入网络") + g.Log().Debugf(ctx, "所有本地种子节点连接失败") } } - // 启动 DHT(自动发现其他节点,构建路由表,脱离对官方节点的依赖) - if err = kadDHT.Bootstrap(ctx); err != nil { + //if !success { + // // 连接 libp2p 官方 Bootstrap 节点(仅作为初始入口) + // officialBootstrapPeers := dht.DefaultBootstrapPeers // 官方节点列表 + // fmt.Println("正在连接官方 Bootstrap 节点(初始入口)...") + // + // for _, addr := range officialBootstrapPeers { + // peerInfo, err := peer.AddrInfoFromP2pAddr(addr) + // if err != nil { + // fmt.Printf("⚠️ 解析官方节点失败: %v\n", err) + // continue + // } + // + // // 添加节点地址到本地地址簿 + // localHost.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL) + // + // // 尝试连接(超时 10 秒) + // connCtx, connCancel := context.WithTimeout(ctx, 20*time.Second) + // err = localHost.Connect(connCtx, *peerInfo) + // connCancel() + // + // if err != nil { + // fmt.Printf("⚠️ 连接官方节点 %s 失败: %v\n", peerInfo.ID.ShortString(), err) + // continue + // } + // fmt.Printf("✅ 连接官方节点成功: %s\n", peerInfo.ID.ShortString()) + // success = true + // } + // + // // 只要连接上至少一个官方节点,即可加入网络(后续会自动发现更多节点) + // if !success { + // return nil, fmt.Errorf("无法连接任何官方 Bootstrap 节点,无法加入网络") + // } + //} + + // 启动 DHT(自动发现其他节点,构建路由表,脱离对官方节点的依赖,带超时,避免阻塞) + bootCtx, bootCancel := context.WithTimeout(ctx, 30*time.Second) + err = kadDHT.Bootstrap(bootCtx) + bootCancel() + if err != nil { return nil, fmt.Errorf("DHT 初始化失败: %v", err) } @@ -149,12 +191,14 @@ func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.Ipf func (s *sP2P) StoreToDHT(ctx context.Context, key string, value string) (err error) { key = s.generateStringDHTKey(key) key = fmt.Sprintf("%s/%s", ProtocolID, key) - g.Log().Debugf(ctx, "StoreToDHT key: %s, value: %s", key, value) - // 存储到本地 - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - if err = s.dht.KadDHT.PutValue(ctx, key, []byte(value)); err != nil { + // 2. 带超时的存储,避免长期阻塞 + storeCtx, storeCancel := context.WithTimeout(ctx, 60*time.Second) + defer storeCancel() + + g.Log().Debugf(storeCtx, "StoreToDHT key: %s, value: %s", key, value) + err = s.dht.KadDHT.PutValue(storeCtx, key, []byte(value)) + if err != nil { return fmt.Errorf("本地存储失败: %v", err) } @@ -171,25 +215,29 @@ func (s *sP2P) FindFromDHT(ctx context.Context, key string) (string, error) { g.Log().Debugf(ctx, "FindFromDHT key: %s", key) // 1. 先检查本地是否存储了数据(本地节点可能已保存) - localValue, err := s.dht.KadDHT.GetValue(ctx, key) + localCtx, localCancel := context.WithTimeout(ctx, 5*time.Second) + defer localCancel() + localValue, err := s.dht.KadDHT.GetValue(localCtx, key) if err == nil { g.Log().Debugf(ctx, "✅ 本地查找成功(数据在当前节点)") return string(localValue), nil } - g.Log().Debugf(ctx, "⚠️ 本地查找失败: %v,开始重试网络查找...", err) + g.Log().Debugf(ctx, "⚠️ 本地查找失败: %v,开始重试网络查找...", err) // 2. 多次重试网络查找 for i := 0; i < maxRetries; i++ { - ctx2, cancel := context.WithTimeout(ctx, 120*time.Second) // 本地测试超时短一些 - defer cancel() - - g.Log().Debugf(ctx2, "🔍 第%d次查找(共%d次)...", i+1, maxRetries) - value, err := s.dht.KadDHT.GetValue(ctx2, key) - if err == nil { - g.Log().Debugf(ctx2, "✅ 第%d次查找成功", i+1) + //ctx2, cancel := context.WithTimeout(ctx, 120*time.Second) // 本地测试超时短一些 + //defer cancel() + findCtx, findCancel := context.WithTimeout(ctx, 60*time.Second) + g.Log().Debugf(findCtx, "🔍 第%d次查找(共%d次)...", i+1, maxRetries) + value, err2 := s.dht.KadDHT.GetValue(findCtx, key) + findCancel() + if err2 == nil { + g.Log().Debugf(findCtx, "✅ 第%d次查找成功", i+1) return string(value), nil + } - g.Log().Debugf(ctx2, "⚠️ 第%d次查找失败: %v,等待重试...", i+1, err) + g.Log().Debugf(ctx, "⚠️ 第%d次查找失败: %v,等待重试...", i+1, err2) time.Sleep(retryInterval) } @@ -197,19 +245,25 @@ func (s *sP2P) FindFromDHT(ctx context.Context, key string) (string, error) { } // 定期打印路由表(观察节点自动发现情况) -func (s *sP2P) printRoutingTable(kadDHT *dht.IpfsDHT, interval time.Duration) { +func (s *sP2P) printRoutingTable(ctx context.Context, kadDHT *dht.IpfsDHT, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { - <-ticker.C - peers := kadDHT.RoutingTable().ListPeers() - fmt.Printf("\n📊 当前路由表节点数: %d(完全去中心化网络节点)\n", len(peers)) - if len(peers) > 0 { - fmt.Println("前 5 个节点 ID:") - for i, p := range peers[:min(5, len(peers))] { - fmt.Printf(" %d. %s\n", i+1, p.ShortString()) + select { + case <-ctx.Done(): + fmt.Println("路由表打印goroutine已退出") + return + case <-ticker.C: + peers := kadDHT.RoutingTable().ListPeers() + fmt.Printf("\n📊 当前路由表节点数: %d(完全去中心化网络节点)\n", len(peers)) + if len(peers) > 0 { + fmt.Println("前 5 个节点 ID:") + for i, p := range peers[:min(5, len(peers))] { + fmt.Printf(" %d. %s\n", i+1, p.ShortString()) + } } } + } } diff --git a/internal/logic/p2p/gateway.go b/internal/logic/p2p/gateway.go index 26f4ffd..e88f5f8 100644 --- a/internal/logic/p2p/gateway.go +++ b/internal/logic/p2p/gateway.go @@ -80,9 +80,9 @@ func (s *sP2P) GatewayStart(ctx context.Context, group *ghttp.RouterGroup) (err // 处理不同类型的消息 switch msg.Type { case MsgTypeRegister: - s.handleRegister(ws, msg) + s.handleRegister(ctx, ws, msg) case MsgTypeDiscover: - s.handleDiscover(ws, msg) + s.handleDiscover(ctx, ws, msg) default: g.Log().Error(ctx, "未知消息类型: %s", msg.Type) } @@ -96,7 +96,7 @@ func (s *sP2P) GatewayStart(ctx context.Context, group *ghttp.RouterGroup) (err } // 处理注册请求 -func (s *sP2P) handleRegister(conn *websocket.Conn, msg GatewayMessage) { +func (s *sP2P) handleRegister(ctx context.Context, conn *websocket.Conn, msg GatewayMessage) { if msg.From == "" { g.Log().Error(ctx, "客户端ID不能为空") return @@ -196,7 +196,7 @@ func (s *sP2P) sendMessage(conn *websocket.Conn, msg GatewayMessage) error { } // 处理发现请求 -func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) { +func (s *sP2P) handleDiscover(ctx context.Context, conn *websocket.Conn, msg GatewayMessage) { if msg.From == "" { s.sendError(conn, "消息缺少发送方ID(from)") return diff --git a/internal/logic/p2p/p2p.go b/internal/logic/p2p/p2p.go index 3fc0ed9..8e8bd21 100644 --- a/internal/logic/p2p/p2p.go +++ b/internal/logic/p2p/p2p.go @@ -20,8 +20,8 @@ import ( ) var ( - ctx = gctx.New() - ip string + //ctx = gctx.New() + ip string ) // 常量定义 @@ -121,6 +121,7 @@ func (s *sP2P) getPublicIPAndType() (ip string, ipType string, err error) { // 只获取IPv4公网IP(过滤IPv6结果) func (s *sP2P) GetIPv4PublicIP() (string, error) { + ctx := gctx.New() // 优先使用只返回IPv4的API,避免IPv6干扰 //client := http.Client{Timeout: 5 * time.Second} diff --git a/internal/service/p_2_p.go b/internal/service/p_2_p.go index c1a5afb..9d69ebd 100644 --- a/internal/service/p_2_p.go +++ b/internal/service/p_2_p.go @@ -16,13 +16,13 @@ type ( IP2P interface { // SendP2P 发送格式化消息 SendP2P(targetID string, typ string, data []byte) (err error) - Start(ctx context.Context, wsStr string) (err error) + Start(wsStr string) (err error) // 创建libp2p主机 CreateLibp2pHost(ctx context.Context, port int) (host.Host, error) // 发现并连接目标节点 DiscoverAndConnect(targetID string) error // 初始化无服务器DHT(作为节点加入DHT网络) - DHTStart(ctx context.Context, h host.Host, bootstrapPeers []string) (err error) + DHTStart(h host.Host, bootstrapPeers []string) (err error) // StoreToDHT 存储数据到 DHT(自动分布式存储) StoreToDHT(ctx context.Context, key string, value string) (err error) // FindFromDHT 从 DHT 查找数据(从网络节点获取)