diff --git a/.gitignore b/.gitignore index d222719..20aa7b4 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ temp.yaml bin **/config/config.yaml v1.0.0/ +manifest/config/local.yaml diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 3af272f..a9a2719 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -2,17 +2,19 @@ package cmd import ( "context" - "fmt" "time" + "github.com/ayflying/p2p/internal/controller/p2p" "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/gcmd" "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/gtimer" ) func init() { - err := Main.AddCommand(&Main, &Debug, &P2p, &DHT) + err := Main.AddCommand(&Main, &Debug, &Update) if err != nil { g.Log().Error(gctx.GetInitCtx(), err) return @@ -32,66 +34,50 @@ var ( parser, err = gcmd.Parse(g.MapStrBool{ "p,port": true, }) - port := parser.GetOpt("port", "23333").Int() + //port := parser.GetOpt("port", "23333").Int() - h, _ := service.P2P().CreateLibp2pHost(gctx.New(), port) - err = service.P2P().DHTStart(h, nil) - if err != nil { - g.Log().Error(ctx, err) - } + parser, err = gcmd.Parse(g.MapStrBool{ + "w,ws": true, + "g,gateway": true, + "p,port": true, + }) + addr := g.Cfg().MustGet(ctx, "ws.address").String() + ws := parser.GetOpt("ws", addr).String() + //port := parser.GetOpt("port", 0).Int() - time.Sleep(5 * time.Second) - publicIp, _ := service.P2P().GetIPv4PublicIP() - validKey := fmt.Sprintf("%v/ip", h.ID()) - dataValue := fmt.Sprintf("来自节点 %s 的数据:%v", h.ID().ShortString(), publicIp) - if err = service.P2P().StoreToDHT(gctx.New(), validKey, dataValue); err != nil { - g.Log().Debugf(ctx, "❌ 存储失败: %v\n", err) - } else { - g.Log().Debugf(ctx, "✅ 存储成功\nKey: %s\nValue: %s\n", validKey, dataValue) - } + s.Group("/", func(group *ghttp.RouterGroup) { + group.Middleware(ghttp.MiddlewareHandlerResponse) + group.Bind( + p2p.NewV1(), + ) + }) - //parser, err = gcmd.Parse(g.MapStrBool{ - // "w,ws": true, - // "g,gateway": true, - // "p,port": true, - //}) - //addr := g.Cfg().MustGet(ctx, "ws.address").String() - //ws := parser.GetOpt("ws", addr).String() - ////port := parser.GetOpt("port", 0).Int() - // - //s.Group("/", func(group *ghttp.RouterGroup) { - // group.Middleware(ghttp.MiddlewareHandlerResponse) - // group.Bind( - // p2p.NewV1(), - // ) - //}) - // - ////启动p2p服务端网关 - //s.Group("/ws", func(group *ghttp.RouterGroup) { - // group.Middleware(ghttp.MiddlewareHandlerResponse) - // err = service.P2P().GatewayStart(ctx, group) - // if err != nil { - // g.Log().Error(ctx, err) - // } - //}) - // - ////s.SetPort(port) - // - //// 延迟启动 - //gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { - // g.Log().Debug(ctx, "开始执行客户端") - // // 启动p2p客户端 - // err = service.P2P().Start(ws) - // - // g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) - // //addrs, _ := net.InterfaceAddrs() - // //for _, addr := range addrs { - // // if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { - // // g.Log().Infof(ctx, "访问地址:http://%v:%d", ipnet.IP.String(), s.GetListenedPort()) - // // } - // //} - // - //}) + //启动p2p服务端网关 + s.Group("/ws", func(group *ghttp.RouterGroup) { + group.Middleware(ghttp.MiddlewareHandlerResponse) + err = service.P2P().GatewayStart(ctx, group) + if err != nil { + g.Log().Error(ctx, err) + } + }) + + //s.SetPort(port) + + // 延迟启动 + gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { + g.Log().Debug(ctx, "开始执行客户端") + // 启动p2p客户端 + err = service.P2P().Start(ws) + + g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) + //addrs, _ := net.InterfaceAddrs() + //for _, addr := range addrs { + // if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { + // g.Log().Infof(ctx, "访问地址:http://%v:%d", ipnet.IP.String(), s.GetListenedPort()) + // } + //} + + }) s.Run() return nil diff --git a/internal/cmd/debug.go b/internal/cmd/debug.go index 0526ada..0df4f31 100644 --- a/internal/cmd/debug.go +++ b/internal/cmd/debug.go @@ -2,11 +2,13 @@ package cmd import ( "context" + "fmt" + "os" - "github.com/ayflying/p2p/internal/service" "github.com/dop251/goja" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcmd" + "github.com/gogf/gf/v2/os/gfile" ) type DebugType struct { @@ -22,7 +24,7 @@ var ( Usage: "debug", Brief: "调试接口", Func: func(ctx context.Context, parser *gcmd.Parser) (err error) { - g.Log().Debug(ctx, "开始执行debug") + g.Log().Debug(ctx, "开始执行debug v1.0.5") g.Log().SetConfigWithMap(g.Map{ "level": "all", @@ -59,10 +61,63 @@ var ( // } // g.Dump(host.ID().String(), host.Addrs()) case "update": - service.OS().Update("v1.0.0", "http://127.0.0.1:8080") + url := "http://ay.cname.com:5244/d/guest/novapps/%E5%88%86%E5%B8%83%E5%BC%8F/p2p/p2p.exe?sign=8anTHvfJKJLCfZTI4IuopNK38x9rEoDiNevr5aZZPgM=:0" + g.Log().Debugf(ctx, "当前开始更新了,url=%v", url) + //service.OS().Update("v1.0.0", "http://127.0.0.1:8080") + + resp, err := g.Client().Get(ctx, url) + if err != nil { + g.Log().Error(ctx, err) + } + //filename := g.Cfg("hack").MustGet(ctx, "gfcli.build.name").String() + filename := gcmd.GetArg(0).String() + + _, err = renameRunningFile(filename) + if err != nil { + g.Log().Error(ctx, err) + } + + //switch runtime.GOOS { + //case "windows": + // fmt.Println("当前系统:Windows") + // filename = filename + ".exe" + // if gfile.Exists(filename) { + // filename += "~" + // } + //default: + // fmt.Println("当前系统:" + runtime.GOOS) + //} + //if gfile.Exists(filename) { + // filename += "~" + //} + err = gfile.PutBytes(filename, resp.ReadAll()) + if err != nil { + g.Log().Error(ctx, err) + } + msg = "下载完成了" } g.Log().Debug(ctx, msg) return }, } ) + +// 重命名正在运行的程序文件(如 p2p.exe → p2p.exe~) +func renameRunningFile(exePath string) (string, error) { + // 目标备份文件名(p2p.exe → p2p.exe~) + backupPath := exePath + "~" + + // 先删除已存在的备份文件(若有) + if _, err := os.Stat(backupPath); err == nil { + if err := os.Remove(backupPath); err != nil { + return "", fmt.Errorf("删除旧备份文件失败: %v", err) + } + } + + // 重命名正在运行的 exe 文件 + // 关键:Windows 允许对锁定的文件执行重命名操作 + if err := os.Rename(exePath, backupPath); err != nil { + return "", fmt.Errorf("重命名运行中文件失败: %v", err) + } + return backupPath, nil +} diff --git a/internal/cmd/p2p.go b/internal/cmd/p2p.go index 8c9057f..581b9bb 100644 --- a/internal/cmd/p2p.go +++ b/internal/cmd/p2p.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "time" "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/frame/g" @@ -41,7 +42,7 @@ var ( Description: p2pHelpDescription, // Func 为命令的执行函数,接收上下文和参数解析器 Func: func(ctx context.Context, parser *gcmd.Parser) (err error) { - g.Log().Debug(ctx, "开始执行p2p") + g.Log().Debug(ctx, "开始执行p2p v1.0.3") s := g.Server() @@ -82,7 +83,20 @@ var ( g.Log().Debug(ctx, "开始执行dht") h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) - err := service.P2P().DHTStart(h, nil) + addr := []string{ + "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + + //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + + //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + //"/ip4/114.132.176.115/tcp/23333/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + } + + err := service.P2P().DHTStart(h, addr) if err != nil { g.Log().Error(ctx, err) } @@ -95,17 +109,26 @@ var ( } else { fmt.Printf("✅ 存储成功\nKey: %s\nValue: %s\n", validKey, dataValue) } - + s.SetPort(0) case "dht2": g.Log().Debug(ctx, "开始执行dht2") h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) //addr := []string{ - // "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - // "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // //"/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // //"/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", // - // "/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - // "/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // //肖晓 + // "/ip4/192.168.50.244/tcp/23333/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + // "/ip4/192.168.50.244/udp/23333/quic-v1/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + // + // //廖玉龙 + // "/ip4/192.168.50.210/tcp/23333/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + // "/ip4/192.168.50.210/udp/23333/quic-v1/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + // + // + // //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", // // //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", // //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", @@ -113,13 +136,16 @@ var ( // //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", //} - id := gcmd.GetOpt("id").String() - err := service.P2P().DHTStart(h, nil) + addrVar, err := g.Cfg().Get(ctx, "dht.addrs") + addr := addrVar.Strings() + + validKey := gcmd.GetOpt("id").String() + err = service.P2P().DHTStart(h, addr) if err != nil { g.Log().Error(ctx, err) } - validKey := id go func() { + time.Sleep(30 * time.Second) // 5. 查找数据(从网络中的节点获取,不依赖初始 Bootstrap 节点) foundValue, err := service.P2P().FindFromDHT(ctx, validKey) if err != nil { diff --git a/internal/cmd/update.go b/internal/cmd/update.go new file mode 100644 index 0000000..5f8f11f --- /dev/null +++ b/internal/cmd/update.go @@ -0,0 +1,42 @@ +package cmd + +import ( + "context" + "fmt" + "path" + "runtime" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gcfg" + "github.com/gogf/gf/v2/os/gcmd" +) + +var ( + Update = gcmd.Command{ + Name: "update", + Usage: "update", + Brief: "更新版本", + Func: func(ctx context.Context, parser *gcmd.Parser) (err error) { + g.Log().Info(ctx, "准备上传更新文件") + //加载编辑配置文件 + g.Cfg("hack").GetAdapter().(*gcfg.AdapterFile).SetFileName("hack/config.yaml") + //获取文件名 + getFileName, err := g.Cfg("hack").Get(ctx, "gfcli.build.name") + filename := getFileName.String() + + getPath, err := g.Cfg("hack").Get(ctx, "gfcli.build.path") + pathMain := getPath.String() + + //获取版本号 + getVersion, err := g.Cfg("hack").Get(ctx, "gfcli.build.version") + version := getVersion.String() + + // 拼接操作系统和架构(格式:OS_ARCH) + platform := fmt.Sprintf("%s_%s", runtime.GOOS, runtime.GOARCH) + + var filePath = path.Join(pathMain, version, platform, filename) + + g.Log().Debugf(ctx, "当前获取到的地址为:%v", filePath) + return + }} +) diff --git a/internal/logic/p2p/client.go b/internal/logic/p2p/client.go index bb228ad..a5f7c7d 100644 --- a/internal/logic/p2p/client.go +++ b/internal/logic/p2p/client.go @@ -15,7 +15,6 @@ import ( "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtimer" "github.com/gogf/gf/v2/util/grand" - "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" @@ -23,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/multiformats/go-multiaddr" ) @@ -82,7 +82,7 @@ func (s *sP2P) Start(wsStr string) (err error) { // 创建客户端实例 s.client = &Client{ ctx: ctx, - Id: uuid.New().String(), + Id: hostObj.ID().String(), gatewayURL: wsStr, host: hostObj, peers: make(map[string]peer.ID), @@ -114,6 +114,7 @@ func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error port = grand.N(50000, 55000) //port = 53533 } + // 配置监听地址 //listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port) var listenAddrs = []string{ @@ -122,25 +123,35 @@ func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error } // 1. 生成密钥对并初始化节点(确保身份有效) - //s.privKey, _, _ = crypto.GenerateKeyPair(crypto.Ed25519, 0) // 推荐使用Ed25519 s.privKey, _ = s.generateFixedKey() + // 3. 手动创建并挂载连接管理器(v0.43.0兼容) + connMgr, err := connmgr.NewConnManager( + 100, // LowWater:连接数低于此值时不主动断开 + 500, // HighWater:连接数高于此值时主动清理无效连接 + connmgr.WithGracePeriod(30*time.Second), // 宽限期:新连接30秒内不被清理 + ) + // 创建主机 h, err := libp2p.New( libp2p.ListenAddrStrings(listenAddrs...), libp2p.Identity(s.privKey), + libp2p.EnableRelay(), // 启用中继兜底 + // 关键:通过WithConnManager选项注入连接管理器 + libp2p.ConnectionManager(connMgr), libp2p.DefaultTransports, libp2p.DefaultMuxers, libp2p.DefaultSecurity, - // 增加NAT端口映射尝试时间 - //libp2p.NATPortMapTimeout(30*time.Second), - - //libp2p.DisableRelay(), // 禁用Relay(如果需要中继,可保留) libp2p.NATPortMap(), // 自动尝试路由器端口映射(跨网络必备) ) if err != nil { return nil, err } + + if err != nil { + return nil, fmt.Errorf("创建ConnManager失败: %v", err) + } + g.Log().Debugf(ctx, "当前p2p的分享地址:%v", h.Addrs()) return h, err diff --git a/internal/logic/p2p/dht.go b/internal/logic/p2p/dht.go index a92865a..45023d2 100644 --- a/internal/logic/p2p/dht.go +++ b/internal/logic/p2p/dht.go @@ -3,19 +3,19 @@ package p2p import ( "context" "fmt" - "log" "net" + "strings" "time" "github.com/gogf/gf/v2/crypto/gsha1" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" + "github.com/libp2p/go-libp2p/core/peerstore" //"github.com/ipfs/boxo/ipns" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" ) @@ -40,11 +40,19 @@ func (s *sP2P) DHTStart(h host.Host, bootstrapPeers []string) (err error) { if len(bootstrapPeers) == 0 { bootstrapPeers = []string{ - "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + //"/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + //"/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // + //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - "/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - "/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //肖晓 + "/ip4/192.168.50.244/tcp/23333/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + "/ip4/192.168.50.244/udp/23333/quic-v1/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + + //廖玉龙 + "/ip4/192.168.50.210/tcp/23333/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + "/ip4/192.168.50.210/udp/23333/quic-v1/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", } } @@ -54,9 +62,11 @@ func (s *sP2P) DHTStart(h host.Host, bootstrapPeers []string) (err error) { // 2. 通过官方 Bootstrap 节点加入公共 DHT 网络(完全去中心化入口) s.dht.KadDHT, err = s.joinGlobalDHT(ctx, h) if err != nil { - log.Fatalf("加入 DHT 网络失败: %v", err) + g.Log().Infof(ctx, "加入 DHT 网络失败: %v", err) + g.Log().Info(ctx, "开启私有节点服务端等待中...") + return } - fmt.Println("✅ 成功启动完全去中心化 DHT 网络") + g.Log().Debug(ctx, "✅ 成功启动完全去中心化 DHT 网络") // 3. 定期打印路由表(观察节点自动发现效果) go s.printRoutingTable(ctx, s.dht.KadDHT, 60*time.Second) @@ -77,41 +87,42 @@ type NoOpValidator struct{} // Validate 总是返回成功,允许任何数据 func (v *NoOpValidator) Validate(key string, value []byte) error { - g.Log().Debugf(gctx.New(), "当前有数据进行保存:key: %s, value: %s", key, value) + + // 1. 检查key是否以 /ay/ 开头 + if !strings.HasPrefix(key, "/ay/") { + return fmt.Errorf("拒绝存储:key必须以 /ay/ 开头,当前key为 %s", key) + } + + g.Log().Debugf(gctx.New(), "外部数据进行保存:key: %s, value: %s", key, value) + // 限制数据大小(防止超大数据占用资源) if len(value) > 1024*1024 { // 1MB上限 return fmt.Errorf("数据超过1MB,拒绝存储") } + return nil } // Select 简单返回第一个数据(不做版本选择) func (v *NoOpValidator) Select(key string, values [][]byte) (int, error) { + g.Log().Debugf(gctx.New(), "外部数据进行选择:key: %s, values: %v", key, values) return 0, nil } -// 清空 Peerstore 中的所有节点缓存(替代 Clear() 方法) -func (s *sP2P) clearPeerstore(ps peerstore.Peerstore) { - // 1. 获取所有缓存的节点 ID - peers := ps.Peers() - for _, p := range peers { - // 2. 删除该节点的所有地址 - ps.ClearAddrs(p) - // 3. 移除该节点的所有元数据(如协议、密钥等) - ps.RemovePeer(p) - } - fmt.Println("✅ Peerstore 缓存已清空(旧节点地址已删除)") -} - // 加入全球公共 DHT 网络(通过官方 Bootstrap 节点,实现完全去中心化) func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.IpfsDHT, error) { - // 关键:启动后先清空 Peerstore 缓存(删除旧公网节点) - s.clearPeerstore(localHost.Peerstore()) + // 2. 基于Host创建IpfsDHT实例(关键步骤) + // 注意:需指定模式(Full/Client),私有网络中Bootstrap节点用Full模式,普通节点用Client模式 + dhtOpts := []dht.Option{ + dht.Mode(dht.ModeClient), // 普通节点用Client模式(轻量) + // dht.Mode(dht.ModeServer), // Bootstrap节点用Full模式(存储完整路由表) + } // 创建 DHT 实例(ModeServer:作为完整节点参与存储和路由) kadDHT, err := dht.New( ctx, - localHost, dht.Mode(dht.ModeServer), + localHost, + dhtOpts..., ) if err != nil { return nil, err @@ -140,6 +151,11 @@ func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.Ipf } } + if !success { + g.Log().Debug(ctx, "所有本地种子节点连接失败,私有网络启动失败") + return nil, fmt.Errorf("所有本地种子节点连接失败") // 连接失败时终止DHT启动 + } + //if !success { // // 连接 libp2p 官方 Bootstrap 节点(仅作为初始入口) // officialBootstrapPeers := dht.DefaultBootstrapPeers // 官方节点列表 @@ -174,8 +190,22 @@ func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.Ipf // } //} + //// 4. 执行Bootstrap,加入私有网络 + //bootstrapCfg := bootstrap.BootstrapConfig{ + // BootstrapPeers: func() []peer.AddrInfo { + // seedPeers, _ := s.parseSeedNodes(s.dht.bootstrapPeers) + // return seedPeers + // }, // 私有Bootstrap节点列表 + // //MinPeers: 1, // 至少连接1个Bootstrap节点 + // Period: 30 * time.Second, // 禁用定期重连 + // //ConnectionMgr: connMgr, // 关联连接管理器 + //} + //if _, err = bootstrap.Bootstrap(localHost.ID(), localHost, kadDHT, bootstrapCfg); err != nil { + // return nil, fmt.Errorf("节点Bootstrap失败: %v", err) + //} + // 启动 DHT(自动发现其他节点,构建路由表,脱离对官方节点的依赖,带超时,避免阻塞) - bootCtx, bootCancel := context.WithTimeout(ctx, 30*time.Second) + bootCtx, bootCancel := context.WithTimeout(ctx, 60*time.Second) err = kadDHT.Bootstrap(bootCtx) bootCancel() if err != nil { @@ -215,7 +245,7 @@ func (s *sP2P) FindFromDHT(ctx context.Context, key string) (string, error) { g.Log().Debugf(ctx, "FindFromDHT key: %s", key) // 1. 先检查本地是否存储了数据(本地节点可能已保存) - localCtx, localCancel := context.WithTimeout(ctx, 5*time.Second) + localCtx, localCancel := context.WithTimeout(ctx, 10*time.Second) defer localCancel() localValue, err := s.dht.KadDHT.GetValue(localCtx, key) if err == nil { @@ -226,8 +256,6 @@ func (s *sP2P) FindFromDHT(ctx context.Context, key string) (string, error) { // 2. 多次重试网络查找 for i := 0; i < maxRetries; i++ { - //ctx2, cancel := context.WithTimeout(ctx, 120*time.Second) // 本地测试超时短一些 - //defer cancel() findCtx, findCancel := context.WithTimeout(ctx, 60*time.Second) g.Log().Debugf(findCtx, "🔍 第%d次查找(共%d次)...", i+1, maxRetries) value, err2 := s.dht.KadDHT.GetValue(findCtx, key) @@ -267,32 +295,17 @@ func (s *sP2P) printRoutingTable(ctx context.Context, kadDHT *dht.IpfsDHT, inter } } -//// 定期打印节点状态(公网地址+路由表) -//func (s *sP2P) printStatus(interval time.Duration) { -// ticker := time.NewTicker(interval) -// for { -// <-ticker.C -// //publicIp, err := service.P2P().GetIPv4PublicIP() -// //publicAddrs := s.getPublicAddrs() -// peers := s.dht.KadDHT.RoutingTable().ListPeers() -// fmt.Printf("\n===== 节点状态 =====") -// fmt.Printf("\n公网地址数: %d(0表示穿透失败)\n", len(publicAddrs)) -// fmt.Printf("路由表节点数: %d(自动扩散结果)\n", len(peers)) -// fmt.Println("====================") -// } -//} - // 打印节点地址(供其他节点手动加入时使用) func (s *sP2P) printNodeAddrs(host host.Host) { fmt.Println("节点地址(公网地址将自动同步到DHT):") for _, addr := range host.Addrs() { fullAddr := fmt.Sprintf("%s/p2p/%s", addr, host.ID()) ipStr, _ := addr.ValueForProtocol(multiaddr.P_IP4) - ip := net.ParseIP(ipStr) - if ip.IsPrivate() || ip.IsLoopback() { - fmt.Printf(" [内网] %s\n", fullAddr) + ipObj := net.ParseIP(ipStr) + if ipObj.IsPrivate() || ipObj.IsLoopback() { + fmt.Printf("%s\n", fullAddr) } else { - fmt.Printf(" [公网] %s\n", fullAddr) + fmt.Printf("%s\n", fullAddr) } } }