mirror of
https://github.com/ayflying/p2p.git
synced 2026-03-04 17:29:22 +00:00
可以完成内网连接,外网打洞失败
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gtimer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
@@ -17,7 +18,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
@@ -48,17 +48,13 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
|
||||
peers: make(map[string]peer.ID),
|
||||
}
|
||||
|
||||
g.Log().Debugf(ctx, "当前p2p的分享地址:%v", hostObj.Addrs())
|
||||
// 设置流处理函数(处理P2P消息)
|
||||
hostObj.SetStreamHandler(ProtocolID, s.handleStream)
|
||||
|
||||
// 连接网关(WebSocket)
|
||||
if err = s.connectGateway(); err != nil {
|
||||
glog.Fatalf(ctx, "连接网关失败: %v", err)
|
||||
}
|
||||
|
||||
// 注册到网关
|
||||
if err = s.register(); err != nil {
|
||||
glog.Fatalf(ctx, "注册到网关失败: %v", err)
|
||||
g.Log().Fatalf(ctx, "连接网关失败: %v", err)
|
||||
}
|
||||
|
||||
// 启动网关消息接收协程
|
||||
@@ -76,14 +72,15 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
|
||||
// 创建libp2p主机
|
||||
func (s *sP2P) createLibp2pHost(ctx context.Context, port int) (host.Host, error) {
|
||||
// 配置监听地址
|
||||
listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)
|
||||
//listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)
|
||||
|
||||
// 创建主机
|
||||
h, err := libp2p.New(
|
||||
libp2p.ListenAddrStrings(listenAddr),
|
||||
libp2p.DefaultTransports,
|
||||
libp2p.DefaultMuxers,
|
||||
libp2p.DefaultSecurity,
|
||||
////libp2p.ListenAddrStrings(listenAddr),
|
||||
//libp2p.ListenAddrs(),
|
||||
//libp2p.DefaultTransports,
|
||||
//libp2p.DefaultMuxers,
|
||||
//libp2p.DefaultSecurity,
|
||||
)
|
||||
return h, err
|
||||
}
|
||||
@@ -97,6 +94,11 @@ func (s *sP2P) connectGateway() (err error) {
|
||||
//defer conn.Close()
|
||||
|
||||
s.client.wsConn = conn
|
||||
|
||||
// 注册到网关
|
||||
if err = s.register(); err != nil {
|
||||
g.Log().Fatalf(ctx, "注册到网关失败: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -164,7 +166,7 @@ func (s *sP2P) SendData(targetID string, data []byte) error {
|
||||
}
|
||||
|
||||
// 创建流
|
||||
stream, err := s.client.host.NewStream(gctx.New(), peerID, protocol.ID("/p2p-chat/1.0.0"))
|
||||
stream, err := s.client.host.NewStream(gctx.New(), peerID, ProtocolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -242,6 +244,11 @@ func (s *sP2P) receiveGatewayMessages() {
|
||||
_, data, err := s.client.wsConn.ReadMessage()
|
||||
if err != nil {
|
||||
glog.Errorf(ctx, "接收网关消息失败: %v", err)
|
||||
|
||||
gtimer.SetTimeout(ctx, 5*time.Second, func(ctx context.Context) {
|
||||
err = s.connectGateway()
|
||||
return
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -251,22 +258,23 @@ func (s *sP2P) receiveGatewayMessages() {
|
||||
continue
|
||||
}
|
||||
|
||||
// 验证消息是否发给自己(to必须是当前客户端ID或空)
|
||||
if msg.To != "" && msg.To != s.client.Id {
|
||||
g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To)
|
||||
continue
|
||||
}
|
||||
//// 验证消息是否发给自己(to必须是当前客户端ID或空)
|
||||
//if msg.To != "" && msg.To != s.client.Id {
|
||||
// g.Log().Debugf(ctx, "忽略非本客户端的消息,from=%s, to=%s", msg.From, msg.To)
|
||||
// continue
|
||||
//}
|
||||
|
||||
// 处理不同类型消息
|
||||
switch msg.Type {
|
||||
case MsgTypeRegisterAck:
|
||||
glog.Infof(ctx, "注册成功")
|
||||
g.Log().Infof(ctx, "注册成功")
|
||||
|
||||
case MsgTypeDiscoverAck:
|
||||
var msgData struct {
|
||||
Found bool `json:"found"`
|
||||
PeerID string `json:"peer_id,omitempty"`
|
||||
Addrs []string `json:"addrs,omitempty"`
|
||||
Found bool `json:"found"`
|
||||
PeerID string `json:"peer_id,omitempty"`
|
||||
Addrs []string `json:"addrs,omitempty"`
|
||||
TargetID string `json:"target_id"`
|
||||
}
|
||||
if err = gjson.DecodeTo(msg.Data, &msgData); err != nil {
|
||||
g.Log().Errorf(ctx, "解析发现响应失败: %v", err)
|
||||
@@ -274,7 +282,7 @@ func (s *sP2P) receiveGatewayMessages() {
|
||||
}
|
||||
|
||||
if !msgData.Found {
|
||||
fmt.Println("未找到目标节点")
|
||||
fmt.Println("gateway未找到目标节点")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -310,7 +318,7 @@ func (s *sP2P) receiveGatewayMessages() {
|
||||
|
||||
g.Log().Infof(ctx, "成功连接到目标节点:%v", targetID)
|
||||
s.client.peers[targetID] = targetPeerID
|
||||
}(peerID, msg.To)
|
||||
}(peerID, msgData.TargetID)
|
||||
|
||||
case MsgTypePunchRequest:
|
||||
err = s.handlePunchRequest(msg.Data)
|
||||
|
||||
@@ -217,7 +217,7 @@ func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) {
|
||||
fromClient.LastActive = time.Now()
|
||||
s.lock.Unlock()
|
||||
|
||||
if targetClient != nil {
|
||||
if targetClient == nil {
|
||||
// 目标不存在
|
||||
s.sendMessage(conn, GatewayMessage{
|
||||
Type: MsgTypeDiscoverAck,
|
||||
@@ -237,10 +237,10 @@ func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) {
|
||||
From: "gateway", // 发送方是网关
|
||||
To: msg.From, // 接收方是原请求方
|
||||
Data: gjson.MustEncode(g.Map{
|
||||
"found": true,
|
||||
"peer_id": targetClient.PeerID,
|
||||
//"addrs": s.getAddrsJSON(targetClient.Addrs),
|
||||
"addrs": targetClient.Addrs,
|
||||
"found": true,
|
||||
"peer_id": targetClient.PeerID,
|
||||
"addrs": targetClient.Addrs,
|
||||
"target_id": data.TargetID,
|
||||
}),
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user