完成dht的分布式储存

This commit is contained in:
2025-10-15 19:41:11 +08:00
parent 865d6fac65
commit 7184defc50
10 changed files with 226 additions and 53 deletions

View File

@@ -67,7 +67,6 @@ var (
group.Middleware(ghttp.MiddlewareHandlerResponse)
service.P2P().GatewayStart(ctx, group)
})
s.Run()
case "client":
// 获取客户端模式所需的参数
g.Log().Debug(ctx, "开始执行client")
@@ -78,7 +77,24 @@ var (
err = service.P2P().Start(ctx, wsStr)
case "dht":
h, _ := service.P2P().CreateLibp2pHost(ctx, 0)
service.P2P().DHTStart(ctx, h)
err := service.P2P().DHTStart(ctx, h)
if err != nil {
g.Log().Error(ctx, err)
}
publicIp, err := service.P2P().GetIPv4PublicIP()
err = service.P2P().StoreAddrToDHT(ctx, "ip", publicIp)
if err != nil {
return
}
case "dht2":
h, _ := service.P2P().CreateLibp2pHost(ctx, 0)
err := service.P2P().DHTStart(ctx, h)
if err != nil {
g.Log().Error(ctx, err)
}
get, _ := service.P2P().FindAddrFromDHT(ctx, "ip")
g.Dump(get)
default:
// 显示帮助信息
@@ -88,6 +104,8 @@ var (
if err != nil {
return err
}
s.Run()
return
},
}

View File

@@ -0,0 +1,14 @@
package p2p
import (
"context"
"github.com/ayflying/p2p/api/p2p/v1"
"github.com/gogf/gf/v2/frame/g"
)
func (c *ControllerV1) Ip(ctx context.Context, req *v1.IpReq) (res *v1.IpRes, err error) {
ip := g.RequestFromCtx(ctx).GetRemoteIp()
g.RequestFromCtx(ctx).Response.Write(ip)
return
}

View File

@@ -12,13 +12,16 @@ import (
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/grand"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)
@@ -50,7 +53,7 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
}
// 设置流处理函数处理P2P消息
hostObj.SetStreamHandler(ProtocolID, s.handleStream)
hostObj.SetStreamHandler(protocol.ID(ProtocolID), s.handleStream)
// 连接网关WebSocket
if err = s.connectGateway(); err != nil {
@@ -72,18 +75,23 @@ func (s *sP2P) Start(ctx context.Context, wsStr string) (err error) {
// 创建libp2p主机
func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error) {
if port == 0 {
//port = grand.N(50000, 55000)
port = 53533
port = grand.N(50000, 55000)
//port = 53533
}
// 配置监听地址
//listenAddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)
var listenAddrs = []string{
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", port),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), // 随机 TCP 端口
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", port), // 随机 UDP 端口QUIC 协议,提升打洞成功率)
}
// 1. 生成密钥对并初始化节点(确保身份有效)
s.privKey, _, _ = crypto.GenerateKeyPair(crypto.Ed25519, 0) // 推荐使用Ed25519
// 创建主机
h, err := libp2p.New(
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.Identity(s.privKey),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
@@ -91,7 +99,11 @@ func (s *sP2P) CreateLibp2pHost(ctx context.Context, port int) (host.Host, error
//libp2p.NATPortMapTimeout(30*time.Second),
// 禁用Relay如果需要中继可保留
libp2p.DisableRelay(),
libp2p.NATPortMap(), // 自动尝试路由器端口映射(跨网络必备)
)
if err != nil {
return nil, err
}
g.Log().Debugf(ctx, "当前p2p的分享地址%v", h.Addrs())
return h, err
@@ -186,7 +198,7 @@ func (s *sP2P) SendData(targetID string, data []byte) error {
}
// 创建流
stream, err := s.client.host.NewStream(gctx.New(), peerID, ProtocolID)
stream, err := s.client.host.NewStream(gctx.New(), peerID, protocol.ID(ProtocolID))
if err != nil {
return err
}

View File

@@ -4,23 +4,109 @@ import (
"context"
"fmt"
//"github.com/ipfs/boxo/ipns"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
// 初始化无服务器DHT作为节点加入DHT网络
func (s *sP2P) DHTStart(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
// 创建DHT实例设置为“客户端+服务端模式”(既可以查找数据,也可以存储数据)
kdht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer))
if err != nil {
return nil, fmt.Errorf("初始化DHT失败: %v", err)
}
// 启动DHT并加入网络会自动发现网络中的其他DHT节点
if err := kdht.Bootstrap(ctx); err != nil {
return nil, fmt.Errorf("DHT加入网络失败: %v", err)
}
fmt.Println("DHT初始化成功节点ID:", h.ID().ShortString())
return kdht, nil
type DHTType struct {
KadDHT *dht.IpfsDHT
}
// 初始化无服务器DHT作为节点加入DHT网络
func (s *sP2P) DHTStart(ctx context.Context, h host.Host) (err error) {
// 创建自定义 DHT 选项,配置验证器
dhtOpts := []dht.Option{
//设置为“客户端+服务端模式”(既可以查找数据,也可以存储数据
dht.Mode(dht.ModeServer),
}
// 创建DHT实例
s.dht.KadDHT, err = dht.New(
ctx,
h,
dhtOpts...,
//dht.Mode(dht.ModeServer),
)
if err != nil {
err = fmt.Errorf("初始化DHT失败: %v", err)
return
}
// 关键:直接替换 DHT 实例的验证器
// v0.35.1 版本中IpfsDHT 结构体的 Validator 字段是公开可修改的
s.dht.KadDHT.Validator = &NoOpValidator{}
// 连接到DHT bootstrap节点种子节点帮助加入网络
// 这里使用libp2p官方的公共bootstrap节点生产环境可替换为自己的节点
bootstrapPeers := dht.DefaultBootstrapPeers
for _, addr := range bootstrapPeers {
peerInfo, _ := peer.AddrInfoFromP2pAddr(addr)
h.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
if err = h.Connect(ctx, *peerInfo); err != nil {
fmt.Printf("连接bootstrap节点 %s 失败: %v\n", peerInfo.ID, err)
} else {
fmt.Printf("已连接bootstrap节点: %s\n", peerInfo.ID)
}
}
// 启动DHT
if err = s.dht.KadDHT.Bootstrap(ctx); err != nil {
return
}
return
}
// 存储数据到DHT比如存储“目标节点ID-公网地址”的映射)
func (s *sP2P) StoreAddrToDHT(ctx context.Context, key string, addr string) (err error) {
// Key目标节点ID作为哈希键Value公网地址需转成二进制
//key = "/ipns/" + key
//key = s.generateStringDHTKey(key)
key = fmt.Sprintf("%s/%s", ProtocolID, key)
value := []byte(addr)
// 存储数据DHT会自动找到负责存储该Key的节点并同步数据
if err = s.dht.KadDHT.PutValue(ctx, key, value); err != nil {
return fmt.Errorf("key=%s,存储地址到DHT失败: %v", key, err)
}
fmt.Printf("成功存储地址到DHTKey=%s, Value=%s\n", key, addr)
return
}
// 从DHT查找数据比如根据节点ID查找其公网地址
func (s *sP2P) FindAddrFromDHT(ctx context.Context, key string) (string, error) {
// 查找数据DHT会通过路由表层层跳转找到负责存储该Key的节点并获取数据
//key = s.generateStringDHTKey(key)
//key = "/ipns/" + key
key = fmt.Sprintf("%s/%s", ProtocolID, key)
value, err := s.dht.KadDHT.GetValue(ctx, key)
if err != nil {
return "", fmt.Errorf("从DHT查找地址失败: %v", err)
}
addr := string(value)
fmt.Printf("从DHT找到地址Key=%s, Value=%s\n", key, addr)
return addr, nil
}
// 生成符合DHT规范的字符串Key
func (s *sP2P) generateStringDHTKey(str string) string {
return ""
}
// 自定义验证器:不做任何校验,接受所有数据
type NoOpValidator struct{}
// Validate 总是返回成功,允许任何数据
func (v *NoOpValidator) Validate(key string, value []byte) error {
return nil
}
// Select 简单返回第一个数据(不做版本选择)
func (v *NoOpValidator) Select(key string, values [][]byte) (int, error) {
return 0, nil
}

View File

@@ -9,8 +9,9 @@ import (
"time"
"github.com/ayflying/p2p/internal/service"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/crypto"
)
var (
@@ -19,8 +20,15 @@ var (
// 常量定义
const (
ProtocolID protocol.ID = "/ay/1.0.0"
DefaultPort = 51888
ProtocolID string = "/ay/1.0.0"
DefaultPort = 51888
)
var (
ipAPIs = []string{
//"http://ay.cname.com:51888/p2p/ip",
"http://54.67.8.27:51888/p2p/ip",
}
)
type MsgType string
@@ -44,8 +52,9 @@ type RegisterData struct {
type sP2P struct {
Clients map[string]*ClientConn // 客户端ID -> 连接
lock sync.RWMutex
client *Client
dht *DHTType
privKey crypto.PrivKey
client *Client
}
// New 创建一个新的 P2P 服务实例
@@ -53,6 +62,7 @@ func New() *sP2P {
return &sP2P{
Clients: make(map[string]*ClientConn),
client: &Client{},
dht: &DHTType{},
}
}
@@ -63,14 +73,9 @@ func init() {
// 获取公网IP并判断类型ipv4/ipv6
func (s *sP2P) getPublicIPAndType() (ip string, ipType string, err error) {
// 公网IP查询接口多个备用
apis := []string{
"https://api.ip.sb/ip",
"https://ip.3322.net",
"https://ifconfig.cn",
}
client := http.Client{Timeout: 5 * time.Second}
for _, api := range apis {
for _, api := range ipAPIs {
resp, err := client.Get(api)
if err != nil {
continue
@@ -106,30 +111,28 @@ func (s *sP2P) getPublicIPAndType() (ip string, ipType string, err error) {
}
// 只获取IPv4公网IP过滤IPv6结果
func (s *sP2P) getIPv4PublicIP() (string, error) {
func (s *sP2P) GetIPv4PublicIP() (string, error) {
// 优先使用只返回IPv4的API避免IPv6干扰
ipv4OnlyAPIs := []string{
//"https://api.ip.sb/ip",
"https://ip.3322.net",
//"https://ifconfig.cn",
}
client := http.Client{Timeout: 5 * time.Second}
for _, api := range ipv4OnlyAPIs {
resp, err := client.Get(api)
//client := http.Client{Timeout: 5 * time.Second}
for _, api := range ipAPIs {
//resp, err := client.Get(api)
resp, err := g.Client().Timeout(5*time.Second).Get(ctx, api)
if err != nil {
continue
}
defer resp.Body.Close()
defer resp.Close()
//defer resp.Body.Close()
// 读取响应
buf := make([]byte, 128)
n, err := resp.Body.Read(buf)
if err != nil {
continue
}
//buf := make([]byte, 128)
//n, err := resp.Body.Read(buf)
//if err != nil {
// continue
//}
ip := strings.TrimSpace(string(buf[:n]))
//ip := strings.TrimSpace(string(buf[:n]))
ip := strings.TrimSpace(resp.ReadAllString())
if ip == "" {
continue
}

View File

@@ -9,7 +9,6 @@ import (
"context"
"github.com/gogf/gf/v2/net/ghttp"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
)
@@ -23,8 +22,14 @@ type (
// 发送数据到目标节点
SendData(targetID string, data []byte) error
// 初始化无服务器DHT作为节点加入DHT网络
DHTStart(ctx context.Context, h host.Host) (*dht.IpfsDHT, error)
DHTStart(ctx context.Context, h host.Host) (err error)
// 存储数据到DHT比如存储“目标节点ID-公网地址”的映射)
StoreAddrToDHT(ctx context.Context, key string, addr string) (err error)
// 从DHT查找数据比如根据节点ID查找其公网地址
FindAddrFromDHT(ctx context.Context, key string) (string, error)
GatewayStart(ctx context.Context, group *ghttp.RouterGroup) (err error)
// 只获取IPv4公网IP过滤IPv6结果
GetIPv4PublicIP() (string, error)
}
)