From 31d1b3d27ed8cd5fa2533c95fced249080a67088 Mon Sep 17 00:00:00 2001 From: ayflying Date: Thu, 30 Oct 2025 11:13:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BB=A3=E7=90=86=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.MD | 8 +-- api/p2p/v1/p2p.go | 13 +++- internal/cmd/debug.go | 6 +- internal/cmd/p2p.go | 52 +++++++------- internal/cmd/update.go | 2 +- .../controller/system/system_v1_update.go | 2 +- internal/logic/p2p/client.go | 59 +++++----------- internal/logic/p2p/dht.go | 22 +++--- internal/logic/p2p/p2p.go | 5 +- internal/logic/p2p/proxy.go | 65 +++++++++++++++--- internal/logic/system/update.go | 4 +- internal/message/consts.go | 18 +++++ internal/message/http/http.go | 17 +++++ internal/message/proxy/proxy.go | 68 +++++++++++++++++++ internal/message/proxyAck/proxyAck.go | 28 ++++++++ internal/service/p_2_p.go | 3 + internal/service/system.go | 2 +- runtime/message.key | 4 ++ 18 files changed, 274 insertions(+), 104 deletions(-) create mode 100644 internal/message/consts.go create mode 100644 internal/message/http/http.go create mode 100644 internal/message/proxy/proxy.go create mode 100644 internal/message/proxyAck/proxyAck.go create mode 100644 runtime/message.key diff --git a/README.MD b/README.MD index 07585ec..838bc0e 100644 --- a/README.MD +++ b/README.MD @@ -30,7 +30,7 @@ ```bash git clone https://github.com/ayflying/p2p.git -cd p2p +cd message ``` ### 安装依赖 @@ -42,13 +42,13 @@ go mod tidy ### 构建项目 ```bash -go build -o p2p +go build -o message ``` ### 运行项目 ```bash -./p2p +./message ``` ## 目录结构 @@ -90,7 +90,7 @@ go build -o p2p 项目提供了命令行工具,支持节点管理、网络状态查询等功能。 ```bash -./p2p --help # 查看帮助信息 +./message --help # 查看帮助信息 ``` ## 开发指南 diff --git a/api/p2p/v1/p2p.go b/api/p2p/v1/p2p.go index 4e73ee5..421e6e2 100644 --- a/api/p2p/v1/p2p.go +++ b/api/p2p/v1/p2p.go @@ -3,7 +3,7 @@ package v1 import "github.com/gogf/gf/v2/frame/g" type ConnectReq struct { - g.Meta `path:"/p2p/connect" tags:"p2p" method:"get" sm:"连接到目标主机"` + g.Meta `path:"/message/connect" tags:"message" method:"get" sm:"连接到目标主机"` TargetID string `json:"id"` } type ConnectRes struct { @@ -11,7 +11,7 @@ type ConnectRes struct { } type SendReq struct { - g.Meta `path:"/p2p/send" tags:"p2p" method:"get" sm:"发送消息"` + g.Meta `path:"/message/send" tags:"message" method:"get" sm:"发送消息"` TargetID string `json:"id"` Data string `json:"data"` } @@ -20,8 +20,15 @@ type SendRes struct { } type IpReq struct { - g.Meta `path:"/p2p/ip" tags:"p2p" method:"get" sm:"获取当前主机的IP地址"` + g.Meta `path:"/message/ip" tags:"message" method:"get" sm:"获取当前主机的IP地址"` } type IpRes struct { g.Meta `mime:"text/html" example:"string"` } + +type Message struct { + Type string `json:"type" dc:"消息类型"` + //Port int `json:"port,omitempty" dc:"请求端口"` + Data []byte `json:"data" dc:"消息数据"` + From string `json:"from" dc:"发送方ID"` +} diff --git a/internal/cmd/debug.go b/internal/cmd/debug.go index 0df4f31..c7adc5b 100644 --- a/internal/cmd/debug.go +++ b/internal/cmd/debug.go @@ -54,7 +54,7 @@ var ( } msg = res.Export() g.Dump(res.ToNumber()) - case "p2p": + case "message": // host, err := service.P2P().Start(ctx) // if err != nil { // break @@ -102,9 +102,9 @@ var ( } ) -// 重命名正在运行的程序文件(如 p2p.exe → p2p.exe~) +// 重命名正在运行的程序文件(如 message.exe → message.exe~) func renameRunningFile(exePath string) (string, error) { - // 目标备份文件名(p2p.exe → p2p.exe~) + // 目标备份文件名(message.exe → message.exe~) backupPath := exePath + "~" // 先删除已存在的备份文件(若有) diff --git a/internal/cmd/p2p.go b/internal/cmd/p2p.go index f70b0fb..a67abcb 100644 --- a/internal/cmd/p2p.go +++ b/internal/cmd/p2p.go @@ -17,15 +17,15 @@ P2P连接工具使用帮助: 模式1: 网关服务器 功能: 拥有外网IP,接收客户端连接,协助P2P打洞 - 命令: p2p -a gateway + 命令: message -a gateway 模式2: 客户端 功能: 连接到网关,通过打洞实现与其他客户端的长连接通讯 - 命令: p2p -a client --gateway 网关ID + 命令: message -a client --gateway 网关ID 高级功能: - 客户端间连接: p2p --mode client --gateway 网关ID --action connect --target 目标客户端ID - 发送消息: p2p -mode client --gateway 网关ID --action send --target 目标客户端ID --message "消息内容" + 客户端间连接: message --mode client --gateway 网关ID --action connect --target 目标客户端ID + 发送消息: message -mode client --gateway 网关ID --action send --target 目标客户端ID --message "消息内容" ` var ( @@ -33,9 +33,9 @@ var ( // 遵循GoFrame的Command对象定义规范,包含名称、用法、简短描述和执行函数 P2p = gcmd.Command{ // Name 为命令名称 - Name: "p2p", + Name: "message", // Usage 描述命令的基本用法 - Usage: "p2p [options]", + Usage: "message [options]", // Brief 提供命令的简短功能描述 Brief: "P2P连接工具,支持网关和客户端模式,实现NAT穿透和点对点通信", // Description 提供命令的详细描述和使用帮助 @@ -81,16 +81,16 @@ var ( h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) addr := []string{ - "/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - "/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + "/ip4/192.168.50.243/tcp/23333/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + "/ip4/192.168.50.243/udp/23333/quic-v1/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/tcp/23333/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/udp/23333/quic-v1/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - //"/ip4/114.132.176.115/tcp/23333/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", - //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + //"/ip4/192.168.50.173/tcp/23333/message/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + //"/ip4/192.168.50.173/udp/23333/quic-v1/message/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + //"/ip4/114.132.176.115/tcp/23333/message/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + //"/ip4/114.132.176.115/udp/23333/quic-v1/message/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", } err := service.P2P().DHTStart(h, addr) @@ -112,25 +112,25 @@ var ( h, _ := service.P2P().CreateLibp2pHost(ctx, 23333) //addr := []string{ - // //"/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - // //"/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // //"/ip4/192.168.50.243/tcp/23333/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + // //"/ip4/192.168.50.243/udp/23333/quic-v1/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", // // //肖晓 - // "/ip4/192.168.50.244/tcp/23333/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", - // "/ip4/192.168.50.244/udp/23333/quic-v1/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + // "/ip4/192.168.50.244/tcp/23333/message/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + // "/ip4/192.168.50.244/udp/23333/quic-v1/message/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", // // //廖玉龙 - // "/ip4/192.168.50.210/tcp/23333/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", - // "/ip4/192.168.50.210/udp/23333/quic-v1/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + // "/ip4/192.168.50.210/tcp/23333/message/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + // "/ip4/192.168.50.210/udp/23333/quic-v1/message/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", // // - // //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - // //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // //"/ip4/192.168.50.173/tcp/23333/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + // //"/ip4/192.168.50.173/udp/23333/quic-v1/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", // - // //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - // //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", - // //"/ip4/114.132.176.115/tcp/23333/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", - // //"/ip4/114.132.176.115/udp/23333/quic-v1/p2p/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + // //"/ip4/192.168.50.173/tcp/23333/message/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + // //"/ip4/192.168.50.173/udp/23333/quic-v1/message/12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26", + // //"/ip4/114.132.176.115/tcp/23333/message/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", + // //"/ip4/114.132.176.115/udp/23333/quic-v1/message/12D3KooWJQMiYyptqSrx4PPsGLY9hjLbaDdxmBXmGtKmSWuiP79D", //} addrVar, err := g.Cfg().Get(ctx, "dht.addrs") diff --git a/internal/cmd/update.go b/internal/cmd/update.go index bc9fc43..d7630e0 100644 --- a/internal/cmd/update.go +++ b/internal/cmd/update.go @@ -79,7 +79,7 @@ var ( g.Log().Debugf(ctx, "当前获取到的地址为:%v", filePath) versionUrl := service.S3().GetCdnUrl(path.Join(rootDir, name)) - listVar := g.Cfg().MustGet(ctx, "p2p.list") + listVar := g.Cfg().MustGet(ctx, "message.list") var p2pItem []struct { Host string `json:"host"` Port int `json:"port"` diff --git a/internal/controller/system/system_v1_update.go b/internal/controller/system/system_v1_update.go index 4825ce0..d9fd213 100644 --- a/internal/controller/system/system_v1_update.go +++ b/internal/controller/system/system_v1_update.go @@ -45,7 +45,7 @@ func (c *ControllerV1) Update(ctx context.Context, req *v1.UpdateReq) (res *v1.U Name string `json:"name"` } - //var GatewayMessage *p2p.GatewayMessage + //var GatewayMessage *message.GatewayMessage var msgData = struct { Files []*DataType `json:"files"` diff --git a/internal/logic/p2p/client.go b/internal/logic/p2p/client.go index 838b5d2..388d747 100644 --- a/internal/logic/p2p/client.go +++ b/internal/logic/p2p/client.go @@ -9,14 +9,15 @@ import ( "strconv" "time" + v1 "github.com/ayflying/p2p/api/p2p/v1" + messageFunc "github.com/ayflying/p2p/internal/message" "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/os/gcache" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gfile" "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/os/gtimer" "github.com/gogf/gf/v2/util/grand" "github.com/gorilla/websocket" @@ -41,19 +42,12 @@ type Client struct { //tcp map[string] } -type Message struct { - Type string `json:"type" dc:"消息类型"` - Port int `json:"port,omitempty" dc:"请求端口"` - Data []byte `json:"data" dc:"消息数据"` - From string `json:"from" dc:"发送方ID"` -} - // SendP2P 发送格式化消息 func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) { if typ == "" { typ = "message" } - message := &Message{ + message := &v1.Message{ Type: typ, From: s.client.Id, Data: data, @@ -199,6 +193,14 @@ func (s *sP2P) register() error { // 发现并连接目标节点 func (s *sP2P) DiscoverAndConnect(targetID string) error { + cacheKey := fmt.Sprintf("%s:%s", s.client.Id, targetID) + get, _ := gcache.Get(gctx.New(), cacheKey) + if !get.IsEmpty() { + return nil + } + // 设置缓存,避免重复发现 + gcache.Set(gctx.New(), cacheKey, targetID, 30*time.Second) + // 发送发现请求 msg := GatewayMessage{ Type: MsgTypeDiscover, @@ -247,45 +249,18 @@ func (s *sP2P) handleStream(stream network.Stream) { } // 解析消息 - var message *Message + var message *v1.Message if err := gjson.DecodeTo(msg, &message); err != nil { g.Log().Debugf(ctx, "解析消息失败: %v", msg) } //g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message)) - switch message.Type { - case "proxy": - var data *ProxyType - gjson.DecodeTo(message.Data, &data) - //g.Dump(data) - // Client - for { - if conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)); err == nil { - if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil { - fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) - err = s.SendP2P(message.From, "proxy_ack", gjson.MustEncode(&ProxyType{ - Ip: ip, - Port: message.Port, - Data: b, - })) - } else { - fmt.Println(err) - } - conn.Close() - } else { - //glog.Error(err) - } - //time.Sleep(time.Second) + if this, ok := messageFunc.Run[message.Type]; ok { + err := this.Message(message) + if err != nil { + g.Log().Error(ctx, err) } - - //conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)) - //if err != nil { - // g.Log().Errorf(ctx, "连接失败:%v", err) - // return - //} - //defer conn.Close() - } } diff --git a/internal/logic/p2p/dht.go b/internal/logic/p2p/dht.go index 45023d2..6beacf9 100644 --- a/internal/logic/p2p/dht.go +++ b/internal/logic/p2p/dht.go @@ -26,8 +26,8 @@ type DHTType struct { var ( //bootstrapPeers = []string{ -// "/ip4/192.168.50.173/tcp/53486/p2p/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw", -// "/ip4/192.168.50.173/udp/53486/quic-v1/p2p/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw", +// "/ip4/192.168.50.173/tcp/53486/message/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw", +// "/ip4/192.168.50.173/udp/53486/quic-v1/message/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw", //} ) @@ -40,19 +40,19 @@ func (s *sP2P) DHTStart(h host.Host, bootstrapPeers []string) (err error) { if len(bootstrapPeers) == 0 { bootstrapPeers = []string{ - //"/ip4/192.168.50.243/tcp/23333/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", - //"/ip4/192.168.50.243/udp/23333/quic-v1/p2p/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + //"/ip4/192.168.50.243/tcp/23333/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", + //"/ip4/192.168.50.243/udp/23333/quic-v1/message/12D3KooWESZtrm6AfqhC3oj5FsAbcSmePwHFFip3F2MPExrxHxwy", // - //"/ip4/192.168.50.173/tcp/23333/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", - //"/ip4/192.168.50.173/udp/23333/quic-v1/p2p/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/tcp/23333/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", + //"/ip4/192.168.50.173/udp/23333/quic-v1/message/12D3KooWKgW8WxncYzZ2h5erMbK3GfLGhNHFapPvhUc1KVmdZeRg", //肖晓 - "/ip4/192.168.50.244/tcp/23333/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", - "/ip4/192.168.50.244/udp/23333/quic-v1/p2p/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + "/ip4/192.168.50.244/tcp/23333/message/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", + "/ip4/192.168.50.244/udp/23333/quic-v1/message/12D3KooWFAt3hTi2SaYNty4gxxBnLRFxJidRDcf4k8HqCUZZRY1W", //廖玉龙 - "/ip4/192.168.50.210/tcp/23333/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", - "/ip4/192.168.50.210/udp/23333/quic-v1/p2p/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + "/ip4/192.168.50.210/tcp/23333/message/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", + "/ip4/192.168.50.210/udp/23333/quic-v1/message/12D3KooWM8eE3i2EWB2wFVGM1URusBPHJrEQJGxKfKgPdxEMm9hn", } } @@ -299,7 +299,7 @@ func (s *sP2P) printRoutingTable(ctx context.Context, kadDHT *dht.IpfsDHT, inter func (s *sP2P) printNodeAddrs(host host.Host) { fmt.Println("节点地址(公网地址将自动同步到DHT):") for _, addr := range host.Addrs() { - fullAddr := fmt.Sprintf("%s/p2p/%s", addr, host.ID()) + fullAddr := fmt.Sprintf("%s/message/%s", addr, host.ID()) ipStr, _ := addr.ValueForProtocol(multiaddr.P_IP4) ipObj := net.ParseIP(ipStr) if ipObj.IsPrivate() || ipObj.IsLoopback() { diff --git a/internal/logic/p2p/p2p.go b/internal/logic/p2p/p2p.go index 7aad023..fd48711 100644 --- a/internal/logic/p2p/p2p.go +++ b/internal/logic/p2p/p2p.go @@ -26,7 +26,7 @@ var ( // 常量定义 const ( - ProtocolID string = "/ay/p2p/1.0.0" + ProtocolID string = "/ay/message/1.0.0" //DefaultPort = 51888 ) @@ -62,6 +62,7 @@ type sP2P struct { dht *DHTType privKey crypto.PrivKey client *Client + IdLock map[string]sync.Mutex } // New 创建一个新的 P2P 服务实例 @@ -189,7 +190,7 @@ func (s *sP2P) removeDuplicates(strs []string) []string { // 生成固定密钥(核心:通过固定种子生成相同密钥) func (s *sP2P) generateFixedKey() (crypto.PrivKey, error) { - privKeyPath := "runtime/p2p.key" + privKeyPath := "runtime/message.key" if ok := gfile.Exists(privKeyPath); ok { // 从文件读取密钥 keyBytes := gfile.GetBytes(privKeyPath) diff --git a/internal/logic/p2p/proxy.go b/internal/logic/p2p/proxy.go index 6a1a009..5f51b4a 100644 --- a/internal/logic/p2p/proxy.go +++ b/internal/logic/p2p/proxy.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "time" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" @@ -16,22 +17,50 @@ type ProxyType struct { } func (s *sP2P) ProxyInit() { - type cfgType struct { + type tcpCfgType struct { Key string `json:"key"` Ip string `json:"ip"` Port int `json:"port"` LocalPort int `json:"local_port"` } - var cfgList []*cfgType - proxyCfg, err := g.Cfg("proxy").Get(gctx.New(), "tcp") + var tcpCfgList []*tcpCfgType + tcpCfg, err := g.Cfg("proxy").Get(gctx.New(), "tcp") if err == nil { - proxyCfg.Scan(&cfgList) - for _, v := range cfgList { + tcpCfg.Scan(&tcpCfgList) + for _, v := range tcpCfgList { go s.Tcp(v.Key, v.Port, v.LocalPort, v.Ip) - + time.Sleep(2 * time.Second) } } + time.Sleep(1 * time.Second) + type httpCfgType struct { + Key string `json:"key"` + Ip string `json:"ip"` + Port int `json:"port"` + Cname string `json:"cname"` + } + var httpCfgList []*httpCfgType + httpCfg, err := g.Cfg("proxy").Get(gctx.New(), "tcp") + if err == nil { + httpCfg.Scan(&httpCfgList) + for _, v := range httpCfgList { + go s.Http(v.Key, v.Cname, v.Port, v.Ip) + time.Sleep(2 * time.Second) + } + } + +} + +var TcpList = make(map[int]*gtcp.Conn) + +func (s *sP2P) TcpAck(port int) *gtcp.Conn { + if v, ok := TcpList[port]; ok { + return v + } else { + g.Log().Errorf(gctx.New(), "端口:%v不存在", port) + return nil + } } func (s *sP2P) Tcp(key string, toPort, myPort int, ip string) { @@ -42,16 +71,20 @@ func (s *sP2P) Tcp(key string, toPort, myPort int, ip string) { // 建立p2p连接 err := s.DiscoverAndConnect(key) if err != nil { - + g.Log().Errorf(gctx.New(), "连接失败:%v", err) + time.Sleep(3 * time.Second) + s.Tcp(key, toPort, myPort, ip) + return } err = gtcp.NewServer(fmt.Sprintf("0.0.0.0:%d", myPort), func(conn *gtcp.Conn) { + TcpList[toPort] = conn defer conn.Close() for { ctx := gctx.New() data, err := conn.Recv(-1) if len(data) > 0 { - g.Log().Debugf(gctx.New(), "内容:%v", string(data)) + g.Log().Debugf(gctx.New(), "本地接口收到内容:%v", string(data)) err = s.SendP2P(key, "proxy", gjson.MustEncode(&ProxyType{ Ip: ip, Port: toPort, @@ -76,3 +109,19 @@ func (s *sP2P) Tcp(key string, toPort, myPort int, ip string) { } } + +func (s *sP2P) Http(key string, cname string, toPort int, ip string) { + if toPort == 0 { + toPort = 80 + } + + // 建立p2p连接 + err := s.DiscoverAndConnect(key) + if err != nil { + g.Log().Errorf(gctx.New(), "连接失败:%v", err) + time.Sleep(3 * time.Second) + s.Http(key, cname, toPort, ip) + return + } + +} diff --git a/internal/logic/system/update.go b/internal/logic/system/update.go index 2c4721d..8118fd0 100644 --- a/internal/logic/system/update.go +++ b/internal/logic/system/update.go @@ -77,9 +77,9 @@ func (s *sSystem) RestartSelf() error { return nil // 理论上不会执行到这里 } -// RenameRunningFile 重命名正在运行的程序文件(如 p2p.exe → p2p.exe~) +// RenameRunningFile 重命名正在运行的程序文件(如 message.exe → message.exe~) func (s *sSystem) RenameRunningFile(exePath string) (string, error) { - // 目标备份文件名(p2p.exe → p2p.exe~) + // 目标备份文件名(message.exe → message.exe~) backupPath := exePath + "~" // 先删除已存在的备份文件(若有) diff --git a/internal/message/consts.go b/internal/message/consts.go new file mode 100644 index 0000000..c7cc5e3 --- /dev/null +++ b/internal/message/consts.go @@ -0,0 +1,18 @@ +package message + +import ( + v1 "github.com/ayflying/p2p/api/p2p/v1" + "github.com/ayflying/p2p/internal/message/http" + "github.com/ayflying/p2p/internal/message/proxy" + "github.com/ayflying/p2p/internal/message/proxyAck" +) + +type P2PMessage interface { + Message(msg *v1.Message) (err error) +} + +var Run = map[string]P2PMessage{ + "proxy": proxy.New, + "proxy_ack": proxyAck.New, + "http": http.New, +} diff --git a/internal/message/http/http.go b/internal/message/http/http.go new file mode 100644 index 0000000..37731e7 --- /dev/null +++ b/internal/message/http/http.go @@ -0,0 +1,17 @@ +package http + +import v1 "github.com/ayflying/p2p/api/p2p/v1" + +type Http struct { + Ip string `json:"ip"` + Port int `json:"port"` + Cname string `json:"cname"` + Data []byte `json:"data"` +} + +var New = Http{} + +func (h Http) Message(msg *v1.Message) (err error) { + //TODO implement me + panic("implement me") +} diff --git a/internal/message/proxy/proxy.go b/internal/message/proxy/proxy.go new file mode 100644 index 0000000..f6a03d5 --- /dev/null +++ b/internal/message/proxy/proxy.go @@ -0,0 +1,68 @@ +package proxy + +import ( + "fmt" + + "github.com/ayflying/p2p/api/p2p/v1" + "github.com/ayflying/p2p/internal/service" + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/gtime" +) + +var New = Proxy{} + +var ( +//ip, _ = service.P2P().GetIPv4PublicIP() +) + +type Proxy struct { + Ip string `json:"ip"` + Port int `json:"port"` + Data []byte `json:"data"` +} + +func (p Proxy) Message(msg *v1.Message) (err error) { + var data *Proxy + + gjson.DecodeTo(msg.Data, &data) + //g.Dump(data) + // Client + go func() { + if conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)); err == nil { + defer conn.Close() + + err = conn.Send(data.Data) + if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil { + //fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + + err = service.P2P().SendP2P(msg.From, "proxy_ack", gjson.MustEncode(&Proxy{ + Ip: data.Ip, + Port: data.Port, + Data: b, + })) + if err != nil { + g.Log().Errorf(gctx.New(), "发送ACK失败:%v", err) + } + + } else { + fmt.Println(err) + } + + } else { + //glog.Error(err) + } + //time.Sleep(time.Second) + + }() + + //conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)) + //if err != nil { + // g.Log().Errorf(ctx, "连接失败:%v", err) + // return + //} + //defer conn.Close() + return +} diff --git a/internal/message/proxyAck/proxyAck.go b/internal/message/proxyAck/proxyAck.go new file mode 100644 index 0000000..68ea538 --- /dev/null +++ b/internal/message/proxyAck/proxyAck.go @@ -0,0 +1,28 @@ +package proxyAck + +import ( + v1 "github.com/ayflying/p2p/api/p2p/v1" + "github.com/ayflying/p2p/internal/service" + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gctx" +) + +type ProxyAck struct { + Ip string `json:"ip"` + Port int `json:"port"` + Data []byte `json:"data"` +} + +var New = ProxyAck{} + +func (p ProxyAck) Message(msg *v1.Message) (err error) { + var data *ProxyAck + gjson.DecodeTo(msg.Data, &data) + //g.Dump(data) + + g.Log().Debugf(gctx.New(), "收到ACK发送到端口:%v", data.Port) + err = service.P2P().TcpAck(data.Port).Send(data.Data) + + return +} diff --git a/internal/service/p_2_p.go b/internal/service/p_2_p.go index 07481f6..918c559 100644 --- a/internal/service/p_2_p.go +++ b/internal/service/p_2_p.go @@ -9,6 +9,7 @@ import ( "context" "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/net/gtcp" "github.com/gorilla/websocket" "github.com/libp2p/go-libp2p/core/host" ) @@ -38,7 +39,9 @@ type ( // 只获取IPv4公网IP(过滤IPv6结果) GetIPv4PublicIP() (string, error) ProxyInit() + TcpAck(port int) *gtcp.Conn Tcp(key string, toPort int, myPort int, ip string) + Http(key string, cname string, toPort int, ip string) } ) diff --git a/internal/service/system.go b/internal/service/system.go index 66bf346..4567f95 100644 --- a/internal/service/system.go +++ b/internal/service/system.go @@ -15,7 +15,7 @@ type ( Update(ctx context.Context) (err error) // RestartSelf 实现 Windows 平台下的程序自重启 RestartSelf() error - // RenameRunningFile 重命名正在运行的程序文件(如 p2p.exe → p2p.exe~) + // RenameRunningFile 重命名正在运行的程序文件(如 message.exe → message.exe~) RenameRunningFile(exePath string) (string, error) } ) diff --git a/runtime/message.key b/runtime/message.key new file mode 100644 index 0000000..2071557 --- /dev/null +++ b/runtime/message.key @@ -0,0 +1,4 @@ +-----BEGIN LIBP2P PRIVATE KEY----- +CAESQJTRc7CwETt8Pw4dNOP3Vz5w157n2tbm4qfdHDFSybcLoJwGI5vjuCZJhktH +fMuZUxXG/zdvVyv7eLmUIpxNdSI= +-----END LIBP2P PRIVATE KEY-----