diff --git a/config/proxy.yaml b/config/proxy.yaml new file mode 100644 index 0000000..f528857 --- /dev/null +++ b/config/proxy.yaml @@ -0,0 +1,5 @@ +#tcp: +# - key: "12D3KooWQsb1137nCzqbMMCzwHsyU8aaCZeFnBUBTkWVsfp8gs26" +# port: 51888 +# ip: ay.cname.com +# local_port: 20000 diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 7943aa8..fc8db8d 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -37,18 +37,14 @@ var ( s := g.Server(consts.Name) - parser, err = gcmd.Parse(g.MapStrBool{ - "p,port": true, - }) - //port := parser.GetOpt("port", "23333").Int() - parser, err = gcmd.Parse(g.MapStrBool{ "w,ws": true, "g,gateway": true, "p,port": true, + "t,type": true, }) - addr := g.Cfg().MustGet(ctx, "ws.address").String() - ws := parser.GetOpt("ws", addr).String() + //addr := g.Cfg().MustGet(ctx, "ws.address").String() + ws := parser.GetOpt("ws").String() if ws == "" { listVar := g.Cfg().MustGet(ctx, "p2p.list") var p2pItem []struct { @@ -64,9 +60,10 @@ var ( } port := parser.GetOpt("port", 0).Int() - if port > 0 { - s.SetPort(port) - } + s.SetPort(port) + //if port > 0 { + // s.SetPort(port) + //} s.Group("/", func(group *ghttp.RouterGroup) { group.Middleware(ghttp.MiddlewareHandlerResponse) @@ -85,15 +82,18 @@ var ( } }) - // 延迟启动 - gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { - g.Log().Debug(ctx, "开始执行客户端") - // 启动p2p客户端 - err = service.P2P().Start(ws) + startType := parser.GetOpt("type").String() + if startType != "server" { + // 延迟启动 + gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) { + g.Log().Debug(ctx, "开始执行客户端") + // 启动p2p客户端 + err = service.P2P().Start(ws) - g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) - - }) + g.Log().Debugf(ctx, "当前监听端口:%v", s.GetListenedPort()) + }) + //s.SetPort(0) + } // 启动系统托盘 service.OS().Load(consts.Name, consts.Name+"服务端", "manifest/images/favicon.ico") diff --git a/internal/controller/system/system_v1_update.go b/internal/controller/system/system_v1_update.go index 6d8df31..4825ce0 100644 --- a/internal/controller/system/system_v1_update.go +++ b/internal/controller/system/system_v1_update.go @@ -40,20 +40,20 @@ func (c *ControllerV1) Update(ctx context.Context, req *v1.UpdateReq) (res *v1.U } } - //更新文件 - err = service.System().Update(ctx) type DataType struct { File []byte `json:"file"` Name string `json:"name"` } + //var GatewayMessage *p2p.GatewayMessage + var msgData = struct { Files []*DataType `json:"files"` }{} msgData.Files = []*DataType{} - files, _ := gfile.ScanDir("download", ".*gz") + files, _ := gfile.ScanDir("download", "*.gz") for _, v := range files { msgData.Files = append(msgData.Files, &DataType{ @@ -63,5 +63,8 @@ func (c *ControllerV1) Update(ctx context.Context, req *v1.UpdateReq) (res *v1.U } service.P2P().SendAll("update", msgData) + + //更新自己的文件 + //err = service.System().Update(ctx) return } diff --git a/internal/logic/os/windows.go b/internal/logic/os/windows.go index cbddeb2..9220c6f 100644 --- a/internal/logic/os/windows.go +++ b/internal/logic/os/windows.go @@ -14,6 +14,14 @@ import ( "github.com/gogf/gf/v2/os/gfile" ) +// 引入 Windows API 函数 +var ( + user32 = syscall.NewLazyDLL("user32.dll") + kernel32 = syscall.NewLazyDLL("kernel32.dll") + showWindow = user32.NewProc("ShowWindow") + getConsoleWnd = kernel32.NewProc("GetConsoleWindow") +) + func (s *sOS) start() { // 系统托盘初始化(设置图标、右键菜单) @@ -22,6 +30,7 @@ func (s *sOS) start() { // 系统托盘初始化(设置图标、右键菜单) func (s *sOS) onSystrayReady() { + //s.hideConsole() iconByte := gfile.GetBytes(s.systray.Icon) systray.SetIcon(iconByte) @@ -29,7 +38,7 @@ func (s *sOS) onSystrayReady() { systray.SetTooltip(s.systray.Tooltip) mQuit := systray.AddMenuItem("退出", "退出应用") - systray.AddMenuItemCheckbox("隐藏窗口", "隐藏窗口", false) + mShow := systray.AddMenuItemCheckbox("显示窗口", "显示窗口", false) // Sets the icon of a menu item. Only available on Mac and Windows. //mQuit.SetIcon(iconByte) go func() { @@ -37,6 +46,9 @@ func (s *sOS) onSystrayReady() { select { case <-mQuit.ClickedCh: systray.Quit() + case <-mShow.ClickedCh: + // 显示窗口 + s.showConsole() } } @@ -62,3 +74,25 @@ func (s *sOS) update(version, server string) { return } } + +// 隐藏控制台窗口 +func (s *sOS) hideConsole() { + // 获取当前控制台窗口句柄 + hWnd, _, _ := getConsoleWnd.Call() + if hWnd == 0 { + return // 无控制台窗口(如编译为GUI子系统时) + } + // SW_HIDE = 0:隐藏窗口 + showWindow.Call(hWnd, 0) +} + +// 显示控制台窗口 +func (s *sOS) showConsole() { + // 获取当前控制台窗口句柄 + hWnd, _, _ := getConsoleWnd.Call() + if hWnd == 0 { + return + } + // SW_SHOW = 5:显示窗口 + showWindow.Call(hWnd, 5) +} diff --git a/internal/logic/p2p/client.go b/internal/logic/p2p/client.go index ae34181..838b5d2 100644 --- a/internal/logic/p2p/client.go +++ b/internal/logic/p2p/client.go @@ -4,16 +4,19 @@ import ( "context" "encoding/json" "fmt" - "net" + "io" "path" "strconv" "time" + "github.com/ayflying/p2p/internal/service" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gfile" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/os/gtimer" "github.com/gogf/gf/v2/util/grand" "github.com/gorilla/websocket" @@ -47,8 +50,11 @@ type Message struct { // SendP2P 发送格式化消息 func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) { + if typ == "" { + typ = "message" + } message := &Message{ - Type: "message", + Type: typ, From: s.client.Id, Data: data, } @@ -56,22 +62,6 @@ func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) { return } -func (s *sP2P) linkTcp(addr string) { - //conn, err := gtcp.Dial(addr) - - // 使用标准库 net.Dial 建立连接 - stdConn, err := net.Dial("tcp", addr) - if err != nil { - fmt.Printf("Dial error: %v\n", err) - return - } - defer stdConn.Close() - - //// 将标准库的 net.Conn 封装为 gtcp.Conn - //gtcpConn := gtcp.NewConn(stdConn) - //defer gtcpConn.Close() -} - func (s *sP2P) Start(wsStr string) (err error) { var ctx = gctx.New() hostObj, err := s.CreateLibp2pHost(ctx, 0) @@ -92,20 +82,22 @@ func (s *sP2P) Start(wsStr string) (err error) { // 设置流处理函数(处理P2P消息) hostObj.SetStreamHandler(protocol.ID(ProtocolID), s.handleStream) - // 连接网关(WebSocket) - if err = s.connectGateway(); err != nil { - g.Log().Fatalf(ctx, "连接网关失败: %v", err) + for { + // 连接网关(WebSocket) + if err = s.connectGateway(); err != nil { + g.Log().Errorf(ctx, "连接网关失败,60秒后重试: %v", err) + time.Sleep(60 * time.Second) + } else { + break + } } // 启动网关消息接收协程 go s.receiveGatewayMessages(ctx) - g.Log().Infof(ctx, "已连接网关成功,客户端ID: %s", s.client.Id) - //g.Log().Infof(ctx,"当前地址:http://127.0.0.1/") + //启动代理初始化 + s.ProxyInit() - //select { - //case <-ctx.Done(): - //} return } @@ -226,23 +218,76 @@ func (s *sP2P) DiscoverAndConnect(targetID string) error { func (s *sP2P) handleStream(stream network.Stream) { ctx := gctx.New() defer stream.Close() + //var err error - peerID := stream.Conn().RemotePeer().String() + //peerID := stream.Conn().RemotePeer().String() //glog.Infof(ctx, "收到来自 %s 的连接", peerID) // 读取数据 buf := make([]byte, 1024) - n, err := stream.Read(buf) - if err != nil { - glog.Errorf(ctx, "读取流数据失败: %v", err) - return + var msg []byte + + for { + n, err := stream.Read(buf) + msg = append(msg, buf[:n]...) + // 再判断错误 + if err != nil { + if err == io.EOF { + // EOF 是正常结束,不算错误 + err = nil + break + } else { + return + } + } + //if err != nil { + // glog.Errorf(ctx, "读取流数据失败: %v", err) + // return + //} } - var msg = buf[:n] // 解析消息 var message *Message - err = gjson.DecodeTo(msg, &message) - g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message)) + if err := gjson.DecodeTo(msg, &message); err != nil { + g.Log().Debugf(ctx, "解析消息失败: %v", msg) + } + + //g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message)) + switch message.Type { + case "proxy": + var data *ProxyType + gjson.DecodeTo(message.Data, &data) + //g.Dump(data) + // Client + for { + if conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)); err == nil { + if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil { + fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + + err = s.SendP2P(message.From, "proxy_ack", gjson.MustEncode(&ProxyType{ + Ip: ip, + Port: message.Port, + Data: b, + })) + } else { + fmt.Println(err) + } + conn.Close() + } else { + //glog.Error(err) + } + //time.Sleep(time.Second) + } + + //conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)) + //if err != nil { + // g.Log().Errorf(ctx, "连接失败:%v", err) + // return + //} + //defer conn.Close() + + } + } // 发送数据到目标节点 @@ -331,9 +376,9 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) { for { _, data, err := s.client.wsConn.ReadMessage() if err != nil { - glog.Errorf(ctx, "接收网关消息失败: %v", err) + g.Log().Errorf(ctx, "接收网关消息失败: %v", err) - gtimer.SetTimeout(ctx, 30*time.Second, func(ctx context.Context) { + gtimer.SetTimeout(ctx, 10*time.Second, func(ctx context.Context) { err = s.connectGateway() return }) @@ -346,11 +391,11 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) { continue } - //// 验证消息是否发给自己(to必须是当前客户端ID或空) - //if msg.To != "" && msg.To != s.client.Id { - // g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To) - // continue - //} + // 验证消息是否发给自己(to必须是当前客户端ID或空) + if msg.To != "" && msg.To != s.client.Id { + g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To) + continue + } // 处理不同类型消息 switch msg.Type { @@ -370,7 +415,7 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) { } if !msgData.Found { - fmt.Println("gateway未找到目标节点") + g.Log().Debug(ctx, "gateway未找到目标节点") continue } @@ -434,7 +479,9 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) { // 更新器路径(假设与主程序同目录) //updaterPath := filepath.Join(filepath.Dir(selfPath), "updater.exe") - g.Log().Infof(ctx, "更新节点信息: %v", data) + g.Log().Info(ctx, "文件接收完成") + // 开始覆盖文件与重启 + err = service.System().Update(ctx) //// 调用不同系统的更新服务 //service.OS().Update(msgData.Version, msgData.Server) diff --git a/internal/logic/p2p/gateway.go b/internal/logic/p2p/gateway.go index 9edeaf1..608f5ac 100644 --- a/internal/logic/p2p/gateway.go +++ b/internal/logic/p2p/gateway.go @@ -140,7 +140,7 @@ func (s *sP2P) handleRegister(ctx context.Context, conn *websocket.Conn, msg *Ga s.Clients[msg.From] = client s.lock.Unlock() - glog.Infof(ctx, "客户端 ip=%s,%s 注册成功,PeerID: %s", conn.RemoteAddr(), msg.From, data.PeerID) + g.Log().Infof(ctx, "客户端 ip=%s,%s 注册成功,PeerID: %s", conn.RemoteAddr(), msg.From, data.PeerID) // 发送注册成功响应 err := s.sendMessage(conn, &GatewayMessage{ diff --git a/internal/logic/p2p/p2p.go b/internal/logic/p2p/p2p.go index 3d3efe4..7aad023 100644 --- a/internal/logic/p2p/p2p.go +++ b/internal/logic/p2p/p2p.go @@ -26,7 +26,7 @@ var ( // 常量定义 const ( - ProtocolID string = "/ay" + ProtocolID string = "/ay/p2p/1.0.0" //DefaultPort = 51888 ) diff --git a/internal/logic/p2p/proxy.go b/internal/logic/p2p/proxy.go new file mode 100644 index 0000000..6a1a009 --- /dev/null +++ b/internal/logic/p2p/proxy.go @@ -0,0 +1,78 @@ +package p2p + +import ( + "fmt" + + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/os/gctx" +) + +type ProxyType struct { + Ip string `json:"ip"` + Port int `json:"port"` + Data []byte `json:"data"` +} + +func (s *sP2P) ProxyInit() { + type cfgType struct { + Key string `json:"key"` + Ip string `json:"ip"` + Port int `json:"port"` + LocalPort int `json:"local_port"` + } + var cfgList []*cfgType + proxyCfg, err := g.Cfg("proxy").Get(gctx.New(), "tcp") + if err == nil { + proxyCfg.Scan(&cfgList) + for _, v := range cfgList { + go s.Tcp(v.Key, v.Port, v.LocalPort, v.Ip) + + } + } + +} + +func (s *sP2P) Tcp(key string, toPort, myPort int, ip string) { + if ip == "" { + ip = "127.0.0.1" + } + + // 建立p2p连接 + err := s.DiscoverAndConnect(key) + if err != nil { + + } + err = gtcp.NewServer(fmt.Sprintf("0.0.0.0:%d", myPort), func(conn *gtcp.Conn) { + defer conn.Close() + for { + ctx := gctx.New() + data, err := conn.Recv(-1) + + if len(data) > 0 { + g.Log().Debugf(gctx.New(), "内容:%v", string(data)) + err = s.SendP2P(key, "proxy", gjson.MustEncode(&ProxyType{ + Ip: ip, + Port: toPort, + Data: data, + })) + if err != nil { + g.Log().Errorf(ctx, "发送失败:%v", err) + s.Tcp(key, toPort, myPort, ip) + return + } + //if err = conn.Send(append([]byte("> "), data...)); err != nil { + // fmt.Println(err) + //} + } + if err != nil { + break + } + } + }).Run() + if err != nil { + g.Log().Error(gctx.New(), err) + } + +} diff --git a/internal/service/p_2_p.go b/internal/service/p_2_p.go index f8a610c..07481f6 100644 --- a/internal/service/p_2_p.go +++ b/internal/service/p_2_p.go @@ -37,6 +37,8 @@ type ( Send(conn *websocket.Conn, typ string, data any) (err error) // 只获取IPv4公网IP(过滤IPv6结果) GetIPv4PublicIP() (string, error) + ProxyInit() + Tcp(key string, toPort int, myPort int, ip string) } )