增加更新重启逻辑

This commit is contained in:
2025-10-17 17:03:19 +08:00
parent 7184defc50
commit 248f9a28e7
24 changed files with 604 additions and 65 deletions

View File

@@ -4,9 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"strconv"
"time"
"github.com/ayflying/p2p/internal/service"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
@@ -33,6 +35,41 @@ type Client struct {
host host.Host
wsConn *websocket.Conn // WebSocket连接
peers map[string]peer.ID // 存储已连接的节点
//tcp map[string]
}
type Message struct {
Type string `json:"type" dc:"消息类型"`
Port int `json:"port,omitempty" dc:"请求端口"`
Data []byte `json:"data" dc:"消息数据"`
From string `json:"from" dc:"发送方ID"`
}
// SendP2P 发送格式化消息
func (s *sP2P) SendP2P(targetID string, typ string, data []byte) (err error) {
message := &Message{
Type: "message",
From: s.client.Id,
Data: data,
}
err = s.sendData(targetID, gjson.MustEncode(message))
return
}
func (s *sP2P) linkTcp(addr string) {
//conn, err := gtcp.Dial(addr)
// 使用标准库 net.Dial 建立连接
stdConn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Printf("Dial error: %v\n", err)
return
}
defer stdConn.Close()
//// 将标准库的 net.Conn 封装为 gtcp.Conn
//gtcpConn := gtcp.NewConn(stdConn)
//defer gtcpConn.Close()
}
func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
@@ -177,7 +214,7 @@ func (s *sP2P) handleStream(stream network.Stream) {
defer stream.Close()
peerID := stream.Conn().RemotePeer().String()
glog.Infof(ctx, "收到来自 %s 的连接", peerID)
//glog.Infof(ctx, "收到来自 %s 的连接", peerID)
// 读取数据
buf := make([]byte, 1024)
@@ -187,11 +224,15 @@ func (s *sP2P) handleStream(stream network.Stream) {
return
}
g.Log().Debugf(ctx, "收到来自 %s 的消息: %v> ", peerID, string(buf[:n]))
var msg = buf[:n]
// 解析消息
var message *Message
err = gjson.DecodeTo(msg, &message)
g.Log().Debugf(ctx, "收到来自 %s 的消息: %v ", peerID, gjson.MustEncodeString(message))
}
// 发送数据到目标节点
func (s *sP2P) SendData(targetID string, data []byte) error {
func (s *sP2P) sendData(targetID string, data []byte) error {
peerID, exists := s.client.peers[targetID]
if !exists {
return fmt.Errorf("未找到目标节点 %s 的连接", targetID)
@@ -325,6 +366,8 @@ func (s *sP2P) receiveGatewayMessages() {
continue
}
g.Log().Infof(ctx, "准备开始打洞到目标节点:%v", msgData.TargetID)
addrs := make([]multiaddr.Multiaddr, len(msgData.Addrs))
for i, addrStr := range msgData.Addrs {
addr, err := multiaddr.NewMultiaddr(addrStr)
@@ -361,6 +404,21 @@ func (s *sP2P) receiveGatewayMessages() {
}
json.Unmarshal(msg.Data, &data)
glog.Errorf(ctx, "网关错误: %s", data.Error)
case MsgUpdate: //更新节点信息
var msgData struct {
Server string `json:"server"`
Version string `json:"version"`
}
//var msgData *dataType
json.Unmarshal(msg.Data, &msgData)
// 更新器路径(假设与主程序同目录)
//updaterPath := filepath.Join(filepath.Dir(selfPath), "updater.exe")
g.Log().Infof(ctx, "更新节点信息: %v", data)
// 调用不同系统的更新服务
service.OS().Update(msgData.Version, msgData.Server)
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
//"github.com/ipfs/boxo/ipns"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
@@ -73,7 +74,8 @@ func (s *sP2P) StoreAddrToDHT(ctx context.Context, key string, addr string) (err
if err = s.dht.KadDHT.PutValue(ctx, key, value); err != nil {
return fmt.Errorf("key=%s,存储地址到DHT失败: %v", key, err)
}
fmt.Printf("成功存储地址到DHTKey=%s, Value=%s\n", key, addr)
g.Log().Info(ctx, "成功存储地址到DHTKey=%s, Value=%s", key, addr)
return
}
@@ -83,6 +85,7 @@ func (s *sP2P) FindAddrFromDHT(ctx context.Context, key string) (string, error)
//key = s.generateStringDHTKey(key)
//key = "/ipns/" + key
key = fmt.Sprintf("%s/%s", ProtocolID, key)
g.Log().Debugf(ctx, "从DHT查找地址中...Key=%s", key)
value, err := s.dht.KadDHT.GetValue(ctx, key)
if err != nil {
return "", fmt.Errorf("从DHT查找地址失败: %v", err)

View File

@@ -39,8 +39,9 @@ const (
MsgTypeDiscover = "discover" // 发现目标客户端
MsgTypeDiscoverAck = "discover_ack" // 发现响应
MsgTypeConnectionReq = "connection_req" // 连接请求通知
MsgTypePunchRequest = "punch_request"
MsgTypeError = "error" // 错误消息
MsgTypePunchRequest = "punch_request" // 打洞请求
MsgTypeError = "error" // 错误消息
MsgUpdate = "update" // 更新节点信息
)
// 注册请求数据