127 lines
2.8 KiB
Go
127 lines
2.8 KiB
Go
package cmd
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
"uploads3/internal/service"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gcmd"
|
||
"github.com/gogf/gf/v2/os/gctx"
|
||
"github.com/gogf/gf/v2/os/gfile"
|
||
"github.com/gogf/gf/v2/text/gstr"
|
||
)
|
||
|
||
// helpDescription 定义P2P命令的详细帮助信息
|
||
const helpDescription = `
|
||
S3上传工具使用帮助:
|
||
-p,path 本地文件夹路径
|
||
-u,upload_path S3上传根路径
|
||
-w,worker 并发数,默认10,最大50
|
||
`
|
||
|
||
var (
|
||
path string
|
||
uploadPath string
|
||
maxCount int
|
||
uploadCount int = 0
|
||
// 管道长度
|
||
workerCount = 10 // 固定并发数100
|
||
|
||
Main = gcmd.Command{
|
||
Name: "main",
|
||
Usage: "main",
|
||
Brief: "start http server",
|
||
// Description 提供命令的详细描述和使用帮助
|
||
Description: helpDescription,
|
||
Func: func(ctx context.Context, parser *gcmd.Parser) (err error) {
|
||
parser, err = gcmd.Parse(g.MapStrBool{
|
||
"p,path": true,
|
||
"u,upload_path": true,
|
||
"w,worker": true,
|
||
})
|
||
|
||
//s := g.Server()
|
||
//s.Group("/", func(group *ghttp.RouterGroup) {
|
||
// group.Middleware(ghttp.MiddlewareHandlerResponse)
|
||
// group.Bind(
|
||
// hello.NewV1(),
|
||
// )
|
||
//})
|
||
//s.Run()
|
||
workerCount = parser.GetOpt("worker", workerCount).Int()
|
||
if workerCount > 50 {
|
||
workerCount = 50
|
||
}
|
||
path = parser.GetOpt("path").String()
|
||
uploadPath = parser.GetOpt("upload_path").String()
|
||
if path == "" || uploadPath == "" {
|
||
g.Log().Errorf(ctx, "path 或 upload_path 为空")
|
||
return
|
||
}
|
||
|
||
S3(path)
|
||
|
||
return nil
|
||
},
|
||
}
|
||
)
|
||
|
||
var UploadTask = make(chan string, workerCount*5)
|
||
|
||
func S3(path string) {
|
||
list, _ := gfile.ScanDirFile(path, "*", true)
|
||
maxCount = len(list)
|
||
g.Log().Debugf(gctx.New(), "当前需要处理的文件数量:%v", len(list))
|
||
go func() {
|
||
for _, v := range list {
|
||
UploadTask <- v
|
||
}
|
||
}()
|
||
time.Sleep(1 * time.Second)
|
||
startWorkers()
|
||
}
|
||
|
||
// 启动100个worker,持续处理任务
|
||
func startWorkers() {
|
||
|
||
// 启动100个worker
|
||
for i := 0; i < workerCount; i++ {
|
||
ctx := gctx.New()
|
||
go func() {
|
||
// 持续从管道取任务,直到管道关闭且所有任务处理完毕
|
||
for {
|
||
select {
|
||
case filename := <-UploadTask:
|
||
//执行上传任务
|
||
uploadToS3(ctx, filename)
|
||
case <-ctx.Done():
|
||
// 上下文取消时,退出循环
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 等待所有任务处理完毕
|
||
for {
|
||
if len(UploadTask) == 0 {
|
||
return
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
func uploadToS3(ctx context.Context, filename string) {
|
||
//todo 实现上传到S3的逻辑
|
||
uploadCount++
|
||
|
||
filepath := gstr.Replace(filename, path, uploadPath)
|
||
filepath = gstr.Replace(filepath, "\\", "/")
|
||
g.Log().Debugf(ctx, "(%d,%d)上传到s3:%v", uploadCount, maxCount, filepath)
|
||
//time.Sleep(grand.D(10*time.Millisecond, time.Second))
|
||
f, _ := gfile.Open(filename)
|
||
service.S3().PutObject(ctx, f, filepath)
|
||
return
|
||
}
|