增加代理方式的打洞,进行tcp的转发

This commit is contained in:
2025-10-28 19:33:20 +08:00
parent c48c85f075
commit 2f4097b697
9 changed files with 236 additions and 67 deletions

View File

@@ -4,16 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"io"
"path"
"strconv"
"time"
"github.com/ayflying/p2p/internal/service"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtcp"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/grand"
"github.com/gorilla/websocket"
@@ -47,8 +50,11 @@ type Message struct {
// SendP2P 发送格式化消息
func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) {
if typ == "" {
typ = "message"
}
message := &Message{
Type: "message",
Type: typ,
From: s.client.Id,
Data: data,
}
@@ -56,22 +62,6 @@ func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) {
return
}
func (s *sP2P) linkTcp(addr string) {
//conn, err := gtcp.Dial(addr)
// 使用标准库 net.Dial 建立连接
stdConn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Printf("Dial error: %v\n", err)
return
}
defer stdConn.Close()
//// 将标准库的 net.Conn 封装为 gtcp.Conn
//gtcpConn := gtcp.NewConn(stdConn)
//defer gtcpConn.Close()
}
func (s *sP2P) Start(wsStr string) (err error) {
var ctx = gctx.New()
hostObj, err := s.CreateLibp2pHost(ctx, 0)
@@ -92,20 +82,22 @@ func (s *sP2P) Start(wsStr string) (err error) {
// 设置流处理函数处理P2P消息
hostObj.SetStreamHandler(protocol.ID(ProtocolID), s.handleStream)
// 连接网关WebSocket
if err = s.connectGateway(); err != nil {
g.Log().Fatalf(ctx, "连接网关失败: %v", err)
for {
// 连接网关WebSocket
if err = s.connectGateway(); err != nil {
g.Log().Errorf(ctx, "连接网关失败,60秒后重试: %v", err)
time.Sleep(60 * time.Second)
} else {
break
}
}
// 启动网关消息接收协程
go s.receiveGatewayMessages(ctx)
g.Log().Infof(ctx, "已连接网关成功客户端ID: %s", s.client.Id)
//g.Log().Infof(ctx,"当前地址http://127.0.0.1/")
//启动代理初始化
s.ProxyInit()
//select {
//case <-ctx.Done():
//}
return
}
@@ -226,23 +218,76 @@ func (s *sP2P) DiscoverAndConnect(targetID string) error {
func (s *sP2P) handleStream(stream network.Stream) {
ctx := gctx.New()
defer stream.Close()
//var err error
peerID := stream.Conn().RemotePeer().String()
//peerID := stream.Conn().RemotePeer().String()
//glog.Infof(ctx, "收到来自 %s 的连接", peerID)
// 读取数据
buf := make([]byte, 1024)
n, err := stream.Read(buf)
if err != nil {
glog.Errorf(ctx, "读取流数据失败: %v", err)
return
var msg []byte
for {
n, err := stream.Read(buf)
msg = append(msg, buf[:n]...)
// 再判断错误
if err != nil {
if err == io.EOF {
// EOF 是正常结束,不算错误
err = nil
break
} else {
return
}
}
//if err != nil {
// glog.Errorf(ctx, "读取流数据失败: %v", err)
// return
//}
}
var msg = buf[:n]
// 解析消息
var message *Message
err = gjson.DecodeTo(msg, &message)
g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message))
if err := gjson.DecodeTo(msg, &message); err != nil {
g.Log().Debugf(ctx, "解析消息失败: %v", msg)
}
//g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message))
switch message.Type {
case "proxy":
var data *ProxyType
gjson.DecodeTo(message.Data, &data)
//g.Dump(data)
// Client
for {
if conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)); err == nil {
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
err = s.SendP2P(message.From, "proxy_ack", gjson.MustEncode(&ProxyType{
Ip: ip,
Port: message.Port,
Data: b,
}))
} else {
fmt.Println(err)
}
conn.Close()
} else {
//glog.Error(err)
}
//time.Sleep(time.Second)
}
//conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port))
//if err != nil {
// g.Log().Errorf(ctx, "连接失败:%v", err)
// return
//}
//defer conn.Close()
}
}
// 发送数据到目标节点
@@ -331,9 +376,9 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) {
for {
_, data, err := s.client.wsConn.ReadMessage()
if err != nil {
glog.Errorf(ctx, "接收网关消息失败: %v", err)
g.Log().Errorf(ctx, "接收网关消息失败: %v", err)
gtimer.SetTimeout(ctx, 30*time.Second, func(ctx context.Context) {
gtimer.SetTimeout(ctx, 10*time.Second, func(ctx context.Context) {
err = s.connectGateway()
return
})
@@ -346,11 +391,11 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) {
continue
}
//// 验证消息是否发给自己to必须是当前客户端ID或空
//if msg.To != "" && msg.To != s.client.Id {
// g.Log().Debugf(ctx, "忽略非本客户端的消息from=%s, to=%s", msg.From, msg.To)
// continue
//}
// 验证消息是否发给自己to必须是当前客户端ID或空
if msg.To != "" && msg.To != s.client.Id {
g.Log().Debugf(ctx, "忽略非本客户端的消息from=%s, to=%s", msg.From, msg.To)
continue
}
// 处理不同类型消息
switch msg.Type {
@@ -370,7 +415,7 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) {
}
if !msgData.Found {
fmt.Println("gateway未找到目标节点")
g.Log().Debug(ctx, "gateway未找到目标节点")
continue
}
@@ -434,7 +479,9 @@ func (s *sP2P) receiveGatewayMessages(ctx context.Context) {
// 更新器路径(假设与主程序同目录)
//updaterPath := filepath.Join(filepath.Dir(selfPath), "updater.exe")
g.Log().Infof(ctx, "更新节点信息: %v", data)
g.Log().Info(ctx, "文件接收完成")
// 开始覆盖文件与重启
err = service.System().Update(ctx)
//// 调用不同系统的更新服务
//service.OS().Update(msgData.Version, msgData.Server)