增加dht的去中心化储存

This commit is contained in:
2025-10-21 15:29:43 +08:00
parent 248f9a28e7
commit ab2f099826
14 changed files with 517 additions and 87 deletions

View File

@@ -3,102 +3,58 @@ package p2p
import (
"context"
"fmt"
"log"
"net"
"time"
"github.com/gogf/gf/v2/crypto/gsha1"
"github.com/gogf/gf/v2/frame/g"
//"github.com/ipfs/boxo/ipns"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
)
type DHTType struct {
KadDHT *dht.IpfsDHT
KadDHT *dht.IpfsDHT
bootstrapPeers []string
}
var (
//bootstrapPeers = []string{
// "/ip4/192.168.50.173/tcp/53486/p2p/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw",
// "/ip4/192.168.50.173/udp/53486/quic-v1/p2p/12D3KooWE3v9623SLukT9dKUQLjqAJrPvzoyRjoUh5MAVGDg69Rw",
//}
)
// 初始化无服务器DHT作为节点加入DHT网络
func (s *sP2P) DHTStart(ctx context.Context, h host.Host) (err error) {
// 创建自定义 DHT 选项,配置验证器
dhtOpts := []dht.Option{
//设置为“客户端+服务端模式”(既可以查找数据,也可以存储数据)
dht.Mode(dht.ModeServer),
}
func (s *sP2P) DHTStart(ctx context.Context, h host.Host, bootstrapPeers []string) (err error) {
//打印节点地址(供其他节点手动加入时使用)
s.printNodeAddrs(h)
s.dht.bootstrapPeers = bootstrapPeers
// 创建DHT实例
s.dht.KadDHT, err = dht.New(
ctx,
h,
dhtOpts...,
//dht.Mode(dht.ModeServer),
)
// 2. 通过官方 Bootstrap 节点加入公共 DHT 网络(完全去中心化入口)
s.dht.KadDHT, err = s.joinGlobalDHT(ctx, h)
if err != nil {
err = fmt.Errorf("初始化DHT失败: %v", err)
return
log.Fatalf("加入 DHT 网络失败: %v", err)
}
fmt.Println("✅ 成功加入完全去中心化 DHT 网络")
// 关键:直接替换 DHT 实例的验证器
// v0.35.1 版本中IpfsDHT 结构体的 Validator 字段是公开可修改的
s.dht.KadDHT.Validator = &NoOpValidator{}
// 连接到DHT bootstrap节点种子节点帮助加入网络
// 这里使用libp2p官方的公共bootstrap节点生产环境可替换为自己的节点
bootstrapPeers := dht.DefaultBootstrapPeers
for _, addr := range bootstrapPeers {
peerInfo, _ := peer.AddrInfoFromP2pAddr(addr)
h.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
if err = h.Connect(ctx, *peerInfo); err != nil {
fmt.Printf("连接bootstrap节点 %s 失败: %v\n", peerInfo.ID, err)
} else {
fmt.Printf("已连接bootstrap节点: %s\n", peerInfo.ID)
}
}
// 启动DHT
if err = s.dht.KadDHT.Bootstrap(ctx); err != nil {
return
}
// 3. 定期打印路由表(观察节点自动发现效果)
go s.printRoutingTable(s.dht.KadDHT, 60*time.Second)
return
}
// 存储数据到DHT比如存储“目标节点ID-公网地址”的映射)
func (s *sP2P) StoreAddrToDHT(ctx context.Context, key string, addr string) (err error) {
// Key目标节点ID作为哈希键Value公网地址需转成二进制
//key = "/ipns/" + key
//key = s.generateStringDHTKey(key)
key = fmt.Sprintf("%s/%s", ProtocolID, key)
value := []byte(addr)
// 存储数据DHT会自动找到负责存储该Key的节点并同步数据
if err = s.dht.KadDHT.PutValue(ctx, key, value); err != nil {
return fmt.Errorf("key=%s,存储地址到DHT失败: %v", key, err)
}
g.Log().Info(ctx, "成功存储地址到DHTKey=%s, Value=%s", key, addr)
return
}
// 从DHT查找数据比如根据节点ID查找其公网地址
func (s *sP2P) FindAddrFromDHT(ctx context.Context, key string) (string, error) {
// 查找数据DHT会通过路由表层层跳转找到负责存储该Key的节点并获取数据
//key = s.generateStringDHTKey(key)
//key = "/ipns/" + key
key = fmt.Sprintf("%s/%s", ProtocolID, key)
g.Log().Debugf(ctx, "从DHT查找地址中...Key=%s", key)
value, err := s.dht.KadDHT.GetValue(ctx, key)
if err != nil {
return "", fmt.Errorf("从DHT查找地址失败: %v", err)
}
addr := string(value)
fmt.Printf("从DHT找到地址Key=%s, Value=%s\n", key, addr)
return addr, nil
}
// 生成符合DHT规范的字符串Key
func (s *sP2P) generateStringDHTKey(str string) string {
return ""
return gsha1.Encrypt(str)
//fullKey := fmt.Sprintf("%s/%s", ProtocolID, str)
//hash, _ := multihash.Sum([]byte(fullKey), multihash.SHA2_256, -1)
//return ipns.key
}
// 自定义验证器:不做任何校验,接受所有数据
@@ -113,3 +69,192 @@ func (v *NoOpValidator) Validate(key string, value []byte) error {
func (v *NoOpValidator) Select(key string, values [][]byte) (int, error) {
return 0, nil
}
// 加入全球公共 DHT 网络(通过官方 Bootstrap 节点,实现完全去中心化)
func (s *sP2P) joinGlobalDHT(ctx context.Context, localHost host.Host) (*dht.IpfsDHT, error) {
// 创建 DHT 实例ModeServer作为完整节点参与存储和路由
kadDHT, err := dht.New(ctx, localHost, dht.Mode(dht.ModeServer))
if err != nil {
return nil, err
}
kadDHT.Validator = &NoOpValidator{}
success := false
if len(s.dht.bootstrapPeers) > 0 {
fmt.Println("正在连接本地种子节点...")
seedPeers, _ := s.parseSeedNodes(s.dht.bootstrapPeers)
for _, p := range seedPeers {
localHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err = localHost.Connect(ctx, p); err != nil {
fmt.Printf("⚠️ 连接本地种子节点 %s 失败: %v\n", p.ID.ShortString(), err)
} else {
fmt.Printf("✅ 连接本地种子节点成功: %s\n", p.ID.ShortString())
}
if err != nil {
fmt.Printf("⚠️ 连接私有节点 %s 失败: %v\n", p.ID.ShortString(), err)
continue
}
fmt.Printf("✅ 连接本地种子节点成功: %s\n", p.ID.ShortString())
success = true
}
if !success {
return nil, fmt.Errorf("所有本地种子节点连接失败")
}
} else {
// 连接 libp2p 官方 Bootstrap 节点(仅作为初始入口)
officialBootstrapPeers := dht.DefaultBootstrapPeers // 官方节点列表
fmt.Println("正在连接官方 Bootstrap 节点(初始入口)...")
for _, addr := range officialBootstrapPeers {
peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
fmt.Printf("⚠️ 解析官方节点失败: %v\n", err)
continue
}
// 添加节点地址到本地地址簿
localHost.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
// 尝试连接(超时 10 秒)
connCtx, connCancel := context.WithTimeout(ctx, 10*time.Second)
err = localHost.Connect(connCtx, *peerInfo)
connCancel()
if err != nil {
fmt.Printf("⚠️ 连接官方节点 %s 失败: %v\n", peerInfo.ID.ShortString(), err)
continue
}
fmt.Printf("✅ 连接官方节点成功: %s\n", peerInfo.ID.ShortString())
success = true
}
// 只要连接上至少一个官方节点,即可加入网络(后续会自动发现更多节点)
if !success {
return nil, fmt.Errorf("无法连接任何官方 Bootstrap 节点,无法加入网络")
}
}
// 启动 DHT自动发现其他节点构建路由表脱离对官方节点的依赖
if err = kadDHT.Bootstrap(ctx); err != nil {
return nil, fmt.Errorf("DHT 初始化失败: %v", err)
}
// 等待路由表初步填充(新增:给路由表留出初始化时间)
time.Sleep(5 * time.Second)
return kadDHT, nil
}
// StoreToDHT 存储数据到 DHT自动分布式存储
func (s *sP2P) StoreToDHT(ctx context.Context, key string, value string) (err error) {
key = s.generateStringDHTKey(key)
key = fmt.Sprintf("%s/%s", ProtocolID, key)
g.Log().Debugf(ctx, "StoreToDHT key: %s, value: %s", key, value)
// 存储到本地
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if err = s.dht.KadDHT.PutValue(ctx, key, []byte(value)); err != nil {
return fmt.Errorf("本地存储失败: %v", err)
}
return
}
// FindFromDHT 从 DHT 查找数据(从网络节点获取)
func (s *sP2P) FindFromDHT(ctx context.Context, key string) (string, error) {
maxRetries := 10 // 最多重试5次
retryInterval := 60 * time.Second // 每次重试间隔2秒本地网络快
key = s.generateStringDHTKey(key)
key = fmt.Sprintf("%s/%s", ProtocolID, key)
g.Log().Debugf(ctx, "FindFromDHT key: %s", key)
// 1. 先检查本地是否存储了数据(本地节点可能已保存)
localValue, err := s.dht.KadDHT.GetValue(ctx, key)
if err == nil {
g.Log().Debugf(ctx, "✅ 本地查找成功(数据在当前节点)")
return string(localValue), nil
}
g.Log().Debugf(ctx, "⚠️ 本地查找失败: %v开始重试网络查找...", err)
// 2. 多次重试网络查找
for i := 0; i < maxRetries; i++ {
ctx2, cancel := context.WithTimeout(ctx, 120*time.Second) // 本地测试超时短一些
defer cancel()
g.Log().Debugf(ctx2, "🔍 第%d次查找共%d次...", i+1, maxRetries)
value, err := s.dht.KadDHT.GetValue(ctx2, key)
if err == nil {
g.Log().Debugf(ctx2, "✅ 第%d次查找成功", i+1)
return string(value), nil
}
g.Log().Debugf(ctx2, "⚠️ 第%d次查找失败: %v等待重试...", i+1, err)
time.Sleep(retryInterval)
}
return "", fmt.Errorf("超过最大重试次数(%d次未找到数据", maxRetries)
}
// 定期打印路由表(观察节点自动发现情况)
func (s *sP2P) printRoutingTable(kadDHT *dht.IpfsDHT, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
<-ticker.C
peers := kadDHT.RoutingTable().ListPeers()
fmt.Printf("\n📊 当前路由表节点数: %d完全去中心化网络节点\n", len(peers))
if len(peers) > 0 {
fmt.Println("前 5 个节点 ID:")
for i, p := range peers[:min(5, len(peers))] {
fmt.Printf(" %d. %s\n", i+1, p.ShortString())
}
}
}
}
//// 定期打印节点状态(公网地址+路由表)
//func (s *sP2P) printStatus(interval time.Duration) {
// ticker := time.NewTicker(interval)
// for {
// <-ticker.C
// //publicIp, err := service.P2P().GetIPv4PublicIP()
// //publicAddrs := s.getPublicAddrs()
// peers := s.dht.KadDHT.RoutingTable().ListPeers()
// fmt.Printf("\n===== 节点状态 =====")
// fmt.Printf("\n公网地址数: %d0表示穿透失败\n", len(publicAddrs))
// fmt.Printf("路由表节点数: %d自动扩散结果\n", len(peers))
// fmt.Println("====================")
// }
//}
// 打印节点地址(供其他节点手动加入时使用)
func (s *sP2P) printNodeAddrs(host host.Host) {
fmt.Println("节点地址公网地址将自动同步到DHT:")
for _, addr := range host.Addrs() {
fullAddr := fmt.Sprintf("%s/p2p/%s", addr, host.ID())
ipStr, _ := addr.ValueForProtocol(multiaddr.P_IP4)
ip := net.ParseIP(ipStr)
if ip.IsPrivate() || ip.IsLoopback() {
fmt.Printf(" [内网] %s\n", fullAddr)
} else {
fmt.Printf(" [公网] %s\n", fullAddr)
}
}
}
func (s *sP2P) parseSeedNodes(seedAddrs []string) ([]peer.AddrInfo, error) {
peers := make([]peer.AddrInfo, 0, len(seedAddrs))
for _, addrStr := range seedAddrs {
addr, err := multiaddr.NewMultiaddr(addrStr)
if err != nil {
return nil, err
}
peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return nil, err
}
peers = append(peers, *peerInfo)
}
return peers, nil
}