成功完成不同内网的通讯

This commit is contained in:
2025-10-15 14:13:50 +08:00
parent e5e8c1c19f
commit c52d5359e9
7 changed files with 338 additions and 35 deletions

View File

@@ -77,6 +77,7 @@ var (
//addrs := "WyIvaXA0LzEyNy4wLjAuMS90Y3AvNTE4ODgiLCIvaXA0LzE5Mi4xNjguNTAuMTczL3RjcC81MTg4OCJd"
wsStr := "ws://192.168.50.173:51888/ws"
err = service.P2P().Start(ctx, wsStr)
case "dht":
default:
// 显示帮助信息

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
@@ -48,7 +49,6 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
peers: make(map[string]peer.ID),
}
g.Log().Debugf(ctx, "当前p2p的分享地址%v", hostObj.Addrs())
// 设置流处理函数处理P2P消息
hostObj.SetStreamHandler(ProtocolID, s.handleStream)
@@ -71,17 +71,29 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
// 创建libp2p主机
func (s *sP2P) createLibp2pHost(ctx context.Context, port int) (host.Host, error) {
if port == 0 {
//port = grand.N(50000, 55000)
port = 53533
}
// 配置监听地址
//listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)
var listenAddrs = []string{
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", port),
}
// 创建主机
h, err := libp2p.New(
////libp2p.ListenAddrStrings(listenAddr),
//libp2p.ListenAddrs(),
//libp2p.DefaultTransports,
//libp2p.DefaultMuxers,
//libp2p.DefaultSecurity,
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
// 增加NAT端口映射尝试时间
//libp2p.NATPortMapTimeout(30*time.Second),
// 禁用Relay如果需要中继可保留
libp2p.DisableRelay(),
)
g.Log().Debugf(ctx, "当前p2p的分享地址%v", h.Addrs())
return h, err
}
@@ -89,24 +101,32 @@ func (s *sP2P) createLibp2pHost(ctx context.Context, port int) (host.Host, error
func (s *sP2P) connectGateway() (err error) {
conn, _, err := websocket.DefaultDialer.Dial(s.client.gatewayURL, nil)
if err != nil {
gtimer.SetTimeout(ctx, 3*time.Minute, func(ctx context.Context) {
err = s.connectGateway()
return
})
return fmt.Errorf("WebSocket连接失败: %v", err)
}
//defer conn.Close()
s.client.wsConn = conn
g.Log().Infof(ctx, "已连接网关成功客户端ID: %s", s.client.Id)
// 注册到网关
if err = s.register(); err != nil {
g.Log().Fatalf(ctx, "注册到网关失败: %v", err)
}
g.Log().Infof(ctx, "已注册到网关客户端ID: %s", s.client.Id)
return
}
// 注册到网关
func (s *sP2P) register() error {
selfAddrs := s.client.host.Peerstore().Addrs(s.client.host.ID())
// 收集地址信息
addrs := make([]string, len(s.client.host.Addrs()))
for i, addr := range s.client.host.Addrs() {
addrs := make([]string, len(selfAddrs))
for i, addr := range selfAddrs {
addrs[i] = addr.String()
}
@@ -245,7 +265,7 @@ func (s *sP2P) receiveGatewayMessages() {
if err != nil {
glog.Errorf(ctx, "接收网关消息失败: %v", err)
gtimer.SetTimeout(ctx, 5*time.Second, func(ctx context.Context) {
gtimer.SetTimeout(ctx, 30*time.Second, func(ctx context.Context) {
err = s.connectGateway()
return
})
@@ -332,3 +352,31 @@ func (s *sP2P) receiveGatewayMessages() {
}
}
}
// 提取libp2p节点的本地TCP监听端口
func (s *sP2P) getLocalTCPPorts(host host.Host) ([]int, error) {
ports := make(map[int]struct{}) // 去重
// 遍历所有本地监听地址
for _, addr := range host.Addrs() {
// 提取TCP端口
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
continue // 跳过非TCP地址
}
port, err := strconv.Atoi(portStr)
if err != nil {
continue
}
ports[port] = struct{}{}
}
// 转换为切片返回
result := make([]int, 0, len(ports))
for port := range ports {
result = append(result, port)
}
return result, nil
}

View File

@@ -1 +1,26 @@
package p2p
import (
"context"
"fmt"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
)
// 初始化无服务器DHT作为节点加入DHT网络
func (s *sP2P) InitDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
// 创建DHT实例设置为“客户端+服务端模式”(既可以查找数据,也可以存储数据)
kdht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer))
if err != nil {
return nil, fmt.Errorf("初始化DHT失败: %v", err)
}
// 启动DHT并加入网络会自动发现网络中的其他DHT节点
if err := kdht.Bootstrap(ctx); err != nil {
return nil, fmt.Errorf("DHT加入网络失败: %v", err)
}
fmt.Println("DHT初始化成功节点ID:", h.ID().ShortString())
return kdht, nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"time"
@@ -111,6 +112,22 @@ func (s *sP2P) handleRegister(conn *websocket.Conn, msg GatewayMessage) {
return
}
// 追加公网ip
publicIp, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
ParseIP := net.ParseIP(publicIp)
var ipType string
if ParseIP.To4() != nil {
ipType = "ip4"
} else {
ipType = "ip6"
}
port2 := 53533
data.Addrs = append(data.Addrs, fmt.Sprintf("/%s/%s/tcp/%d", ipType, publicIp, port2))
data.Addrs = append(data.Addrs, fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, publicIp, port2))
// 过滤回环地址
data.Addrs = s.filterLoopbackAddrs(data.Addrs)
// 保存客户端信息
client := &ClientConn{
ID: msg.From,
@@ -253,7 +270,7 @@ func (s *sP2P) handleDiscover(conn *websocket.Conn, msg GatewayMessage) {
"from_id": msg.From,
"peer_id": fromClient.PeerID,
//"addrs": s.getAddrsJSON(fromClient.Addrs),
"addrs": targetClient.Addrs,
"addrs": fromClient.Addrs,
}),
})
}

View File

@@ -1,7 +1,12 @@
package p2p
import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/ayflying/p2p/internal/service"
"github.com/gogf/gf/v2/os/gctx"
@@ -54,3 +59,117 @@ func New() *sP2P {
func init() {
service.RegisterP2P(New())
}
// 获取公网IP并判断类型ipv4/ipv6
func (s *sP2P) getPublicIPAndType() (ip string, ipType string, err error) {
// 公网IP查询接口多个备用
apis := []string{
"https://api.ip.sb/ip",
"https://ip.3322.net",
"https://ifconfig.cn",
}
client := http.Client{Timeout: 5 * time.Second}
for _, api := range apis {
resp, err := client.Get(api)
if err != nil {
continue
}
defer resp.Body.Close()
// 读取响应纯IP字符串
buf := make([]byte, 128)
n, err := resp.Body.Read(buf)
if err != nil {
continue
}
ip = strings.TrimSpace(string(buf[:n]))
if ip == "" {
continue
}
// 判断IP类型
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
continue // 无效IP格式
}
if parsedIP.To4() != nil {
return ip, "ipv4", nil // IPv4
} else if parsedIP.To16() != nil {
return ip, "ipv6", nil // IPv6
}
}
return "", "", fmt.Errorf("所有公网IP查询接口均失败")
}
// 只获取IPv4公网IP过滤IPv6结果
func (s *sP2P) getIPv4PublicIP() (string, error) {
// 优先使用只返回IPv4的API避免IPv6干扰
ipv4OnlyAPIs := []string{
//"https://api.ip.sb/ip",
"https://ip.3322.net",
//"https://ifconfig.cn",
}
client := http.Client{Timeout: 5 * time.Second}
for _, api := range ipv4OnlyAPIs {
resp, err := client.Get(api)
if err != nil {
continue
}
defer resp.Body.Close()
// 读取响应
buf := make([]byte, 128)
n, err := resp.Body.Read(buf)
if err != nil {
continue
}
ip := strings.TrimSpace(string(buf[:n]))
if ip == "" {
continue
}
// 严格验证是否为IPv4过滤IPv6
parsedIP := net.ParseIP(ip)
if parsedIP != nil && parsedIP.To4() != nil { // 确保是IPv4
return ip, nil
}
}
return "", fmt.Errorf("所有IPv4公网查询接口均失败或返回非IPv4地址")
}
// 过滤地址列表排除127.0.0.1回环地址
func (s *sP2P) filterLoopbackAddrs(addrStrs []string) []string {
var filtered []string
for _, addrStr := range addrStrs {
// 直接过滤包含127.0.0.1的地址字符串
if strings.Contains(addrStr, "/ip4/127.0.0.1/") {
continue // 跳过回环地址
}
filtered = append(filtered, addrStr)
}
// 移除重复地址
filtered = s.removeDuplicates(filtered)
return filtered
}
// 去除字符串切片中的重复元素,保持首次出现的顺序
func (s *sP2P) removeDuplicates(strs []string) []string {
seen := make(map[string]bool) // 用于记录已出现的字符串
result := make([]string, 0, len(strs)) // 结果切片,预分配容量
for _, s := range strs {
if !seen[s] { // 如果未出现过
seen[s] = true // 标记为已出现
result = append(result, s) // 添加到结果
}
}
return result
}