增加代理接口

This commit is contained in:
2025-10-30 11:13:47 +08:00
parent 2f4097b697
commit 31d1b3d27e
18 changed files with 274 additions and 104 deletions

View File

@@ -9,14 +9,15 @@ import (
"strconv"
"time"
v1 "github.com/ayflying/p2p/api/p2p/v1"
messageFunc "github.com/ayflying/p2p/internal/message"
"github.com/ayflying/p2p/internal/service"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtcp"
"github.com/gogf/gf/v2/os/gcache"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/grand"
"github.com/gorilla/websocket"
@@ -41,19 +42,12 @@ type Client struct {
//tcp map[string]
}
type Message struct {
Type string `json:"type" dc:"消息类型"`
Port int `json:"port,omitempty" dc:"请求端口"`
Data []byte `json:"data" dc:"消息数据"`
From string `json:"from" dc:"发送方ID"`
}
// SendP2P 发送格式化消息
func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) {
if typ == "" {
typ = "message"
}
message := &Message{
message := &v1.Message{
Type: typ,
From: s.client.Id,
Data: data,
@@ -199,6 +193,14 @@ func (s *sP2P) register() error {
// 发现并连接目标节点
func (s *sP2P) DiscoverAndConnect(targetID string) error {
cacheKey := fmt.Sprintf("%s:%s", s.client.Id, targetID)
get, _ := gcache.Get(gctx.New(), cacheKey)
if !get.IsEmpty() {
return nil
}
// 设置缓存,避免重复发现
gcache.Set(gctx.New(), cacheKey, targetID, 30*time.Second)
// 发送发现请求
msg := GatewayMessage{
Type: MsgTypeDiscover,
@@ -247,45 +249,18 @@ func (s *sP2P) handleStream(stream network.Stream) {
}
// 解析消息
var message *Message
var message *v1.Message
if err := gjson.DecodeTo(msg, &message); err != nil {
g.Log().Debugf(ctx, "解析消息失败: %v", msg)
}
//g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message))
switch message.Type {
case "proxy":
var data *ProxyType
gjson.DecodeTo(message.Data, &data)
//g.Dump(data)
// Client
for {
if conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port)); err == nil {
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
err = s.SendP2P(message.From, "proxy_ack", gjson.MustEncode(&ProxyType{
Ip: ip,
Port: message.Port,
Data: b,
}))
} else {
fmt.Println(err)
}
conn.Close()
} else {
//glog.Error(err)
}
//time.Sleep(time.Second)
if this, ok := messageFunc.Run[message.Type]; ok {
err := this.Message(message)
if err != nil {
g.Log().Error(ctx, err)
}
//conn, err := gtcp.NewConn(fmt.Sprintf("%s:%v", data.Ip, data.Port))
//if err != nil {
// g.Log().Errorf(ctx, "连接失败:%v", err)
// return
//}
//defer conn.Close()
}
}