目录
  • 1. 选择命令行包
  • 2. 读取配置,连接数据库
  • 3. 读取文件
    • 3.1. 并发读
    • 3.2. 使用excelize处理excel
    • 3.3. 使用mpb在命令行输出进度显示
  • 4. 写入mongodb
    • 5. 同步mysql
      • 6. 总结

        需求:完成一个命令工具,批量处理某个目录下面的一些excel,将这些excel数据导入到mongodb,同时可以同步到mysql

        :: 花了一天时间写完代码,代码库位置:https://gitee.com/foz/lib/tree/master/ecc

        代码目录:

        ├─cmd
        |  └─ecc.go     # 命令
        ├─configs
        ├─data
        ├─internal
        │  └─importing  # 主要逻辑处理
        ├─pkg           # 处理文件读取、连接数据库等
        │  ├─files
        │  ├─mongo
        │  └─mysql
        ├─queue
        └─tools

        1. 选择命令行包

        平常使用的的命令工具包有:

        • urfave/cli
        • spf13/cobra

        这里使用的是urfave/cli包,比较简单

        var DirPath = "../data"     // 默认位置
        var dir = DirPath
        app := &cli.App{
        		Name:  "Ecc",
        		Usage: "Ecc is a tools for batch processing of excel data",
        		Flags: []cli.Flag{
        			&cli.StringFlag{
        				Name:        "model",
        				Aliases:     []string{"m"},
        				Usage:       "The model of searching",
        				Value:       "model",
        				Destination: &model,
        			},
        			&cli.StringFlag{    // 设置一个 -d 的参数,用来确定目标文件夹位置
        				Name:        "dir",
        				Aliases:     []string{"d"},
        				Usage:       "Folder location of data files",
        				Destination: &dir,
        				Value:       DirPath,
        		},
        		Action: func(c *cli.Context) error {
        			importing.Load("../configs/cfg.yaml")  // 引入配置文件,读取mongodb、mysql等配置
        			importing.Handle(dir)  ## 具体逻辑处理
        			return nil
        	}

        2. 读取配置,连接数据库

        读取配置使用spf13/viper库,需要读取一下配置,连接mongodb

        var C Config
        
        type Config struct {
        	Env   string `yaml:"env"`
        	Mongo struct {
        		DNS        string `yaml:"dns"`
        		Db         string `yaml:"db"`
        		Collection string `yaml:"collection"`
        	} `yaml:"mongo"`
        	Mysql struct {
        		Alias string `yaml:"alias"`
        		Dns   string `yaml:"dns"`
        	} `yaml:"mysql"`
        }
        func Load(cf string) {
        	var err error
        	viper.SetConfigFile(cf)
        	if err = viper.ReadInConfig(); err != nil {
        		log.Fatal(fmt.Errorf("fatal error config file: %s \n", err))
        	}
        	if err = viper.Unmarshal(&configs.C); err != nil {
        		log.Fatal(fmt.Errorf("unmarshal conf failed, err:%s \n", err))
        	if err = mongo.Conn(configs.C.Mongo.DNS, configs.C.Mongo.Db); err != nil {
        		log.Fatal(color.RedString("%s:\n%v", "mongo connect err", err))
        	if mongo.CheckCollection(configs.C.Mongo.Collection) {
        		if err = mongo.DelCollection(configs.C.Mongo.Collection); err != nil {
        			log.Fatal(color.RedString("%s:\n%v", "mongo del collection err", err))
        		}
        	if err = mongo.CreateCollection(configs.C.Mongo.Collection); err != nil {
        		log.Fatal(color.RedString("%s:\n%v", "mongo create collection err", err))

        3. 读取文件

        先确定文件权限以及文件是否存在

        func ReadDir(dir string) ([]os.FileInfo, error) {
        	perm := checkPermission(dir)
        	if perm == true {
        		return nil, fmt.Errorf("permission denied dir: %s", dir)
        	}
        
        	if isNotExistDir(dir) {
        		return nil, fmt.Errorf("does not exist dir: %s", dir)
        	files, err := ioutil.ReadDir(dir)
        	if err == nil {
        		return files, err
        	return nil, fmt.Errorf("ReadDir: %s, err: %v", dir, err)
        }

        拿到文件后就要并发读取每个excel文件数据

        这里需求是一次任务必须读完所有的文件,任何一个文件有错误就退出程序。

        :: 所以需要定义异常退出信道和一个完成读取两个信道,总的数据使用sync.Map安全并发写入。

        3.1. 并发读

        rWait   = true
        rDone   = make(chan struct{})
        rCrash  = make(chan struct{})
        
        read(f, dir, data)
        for rWait {  		// 使用for循环来阻塞读文件
        	select {
        	case <-rCrash:
        		abort("-> Failure")
        		return
        	case <-rDone:
        		rWait = false
        	}
        }
        func read(fs []os.FileInfo, dir string, data *sync.Map) {
        	for _, file := range fs {
        		fileName := file.Name()
        		_ext := filepath.Ext(fileName)
        		if Include(strings.Split(Exts, ","), _ext) {
        			wg.Add(1)
        			inCh := make(chan File)
        			go func() {
        				defer wg.Done()
        				select {
        				case <-rCrash:
        					return // 退出goroutine
        				case f := <-inCh:
        					e, preData := ReadExcel(f.FilePath, f.FileName, pb)
        					if e != nil {
        						tools.Red("%v", e)
        						// 使用sync.once防止多个goroutine关闭同一个信道
        						once.Do(func() { 
        							close(rCrash)
        						})
        						return
        					}
        					data.Store(f.FileName, preData)
        				}
        			}()
        				inCh <- File{
        					FileName: fileName,
        					FilePath: dir + string(os.PathSeparator) + fileName,
        		}
        	go func() {
        		wg.Wait()
        		close(rDone)
        	}()

        3.2. 使用excelize处理excel

        excelize是一个非常好用的excel处理库,这里使用这个库读取excel文件内容

        type ExcelPre struct {
        	FileName    string
        	Data        [][]string
        	Fields      []string
        	Prefixes    string
        	ProgressBar *mpb.Bar  // 进度条对象
        }
        
        func ReadExcel(filePath, fileName string, pb *mpb.Progress) (err error, pre *ExcelPre) {
        	f, err := excelize.OpenFile(filePath)
        	if err != nil {
        		return err, nil
        	}
        	defer func() {
        		if _e := f.Close(); _e != nil {
        			fmt.Printf("%s: %v.\n\n", filePath, _e)
        		}
        	}()
        	// 获取第一页数据
        	firstSheet := f.WorkBook.Sheets.Sheet[0].Name
        	rows, err := f.GetRows(firstSheet)
        	lRows := len(rows)
        	if lRows < 2 {
        		lRows = 2
        	rb := ReadBar(lRows, filePath, pb)
        	wb := WriteBar(lRows-2, filePath, rb, pb)
        	var fields []string
        	var data [][]string
                // 进度条增加一格
        	InCr := func(start time.Time) {
        		rb.Increment()
        		rb.DecoratorEwmaUpdate(time.Since(start))
        	for i := 0; i < lRows; i++ {
        		InCr(time.Now())
        		// 这里对第一行处理,用来判断一些约定的条件
        		if i == 0 {
        			fields = rows[i]
        			for index, field := range fields {
        				if isChinese := regexp.MustCompile("[\u4e00-\u9fa5]"); isChinese.MatchString(field) || field == "" {
        					err = errors.New(fmt.Sprintf("%s: line 【A%d】 field 【%s】 \n", filePath, index, field) + "The first line of the file is not a valid attribute name.")
        					return err, nil
        				}
        			}
        			continue
        		// 过滤第二行,这一行通常是中文解释字段
        		if i == 1 {
        		data = append(data, rows[i])
        	return nil, &ExcelPre{
        		FileName:    fileName,
        		Data:        data,
        		Fields:      fields,
        		Prefixes:    Prefix(fileName),
        		ProgressBar: wb,

        3.3. 使用mpb在命令行输出进度显示

        mpb是一个很好用的命令行进度输出库,上面代码里里有两个进度条,一个是读进度条,第二个是写进度条,读进度条在文件读取的时候就显示了,返回的结构体里有写进度条对象,便于后面写操作时候显示。

        下面是两个进度条显示的配置,具体参数可以看这个库的文档。

        func ReadBar(total int, name string, pb *mpb.Progress) *mpb.Bar {
        	return pb.AddBar(int64(total),
        		mpb.PrependDecorators(
        			decor.OnComplete(decor.Name(color.YellowString("reading"), decor.WCSyncSpaceR), color.YellowString("waiting")),
        			decor.CountersNoUnit("%d / %d", decor.WCSyncWidth, decor.WCSyncSpaceR),
        		),
        		mpb.AppendDecorators(
        			decor.NewPercentage("%.2f:", decor.WCSyncSpaceR),
        			decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth),
        			decor.Name(": "+name),
        	)
        }
        
        func WriteBar(total int, name string, beforeBar *mpb.Bar, pb *mpb.Progress) *mpb.Bar {
        		mpb.BarQueueAfter(beforeBar, false),
        		mpb.BarFillerClearOnComplete(),
        			decor.OnComplete(decor.Name(color.YellowString("writing"), decor.WCSyncSpaceR), color.GreenString("done")),
        			decor.OnComplete(decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), ""),
        			decor.OnComplete(decor.NewPercentage("%.2f:", decor.WCSyncSpaceR), ""),
        			decor.OnComplete(decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth), ""),
        			decor.OnComplete(decor.Name(": "+name), name),

        4. 写入mongodb

        同写入操作,这里拿到所有数据,然后使用goroutine并发写入mongodb,在处理数据时候需要查重,还需要记录一下本次操作插入了哪些数据的_id值,在报错的时候进行删除(这里可以使用事务,直接删除简单些),所以定义了一个Shuttle结构体用来在记录并发时的数据。

        wWait   = true
        wDone   = make(chan struct{})
        wCrash  = make(chan struct{})
        
        type Shuttle struct {
        	Hid []string  	// 用来判断是否是重复数据
        	Mid []string  	// 用来记录本次插入的数据_id
        	mu  sync.Mutex
        }
        func (s *Shuttle) Append(t string, str string) {
        	s.mu.Lock()
        	defer s.mu.Unlock()
        	switch t {
        	case "h":
        		s.Hid = append(s.Hid, str)
        	case "m":
        		s.Mid = append(s.Mid, str)
        	}
        write2mongo(data)
        for wWait {
        	select {
        	case <-wCrash:
        		abort("-> Failure")
        		return
        	case <-wDone:
        		wWait = false
        func write2mongo(data *sync.Map) {
        	collection := mongo.GetCollection(configs.C.Mongo.Collection)
        	data.Range(func(key, value interface{}) bool {
        		if v, ok := value.(*ExcelPre); ok {
        			wg.Add(1)
        			inCh := make(chan []bson.M)
        			go func() {
        				defer wg.Done()
        				select {
        				case <-wCrash:
        					return // exit
        				case rows := <-inCh:
        					e := Write2Mongo(rows, collection, v, &shuttle)
        					if e != nil {
        						tools.Red("%v", e)
        						once.Do(func() {
        							close(wCrash)
        						})
        						return
        					}
        				}
        			}()
        				inCh <- PreWrite(v)
        		}
        		return true
        	})
        	go func() {
        		wg.Wait()
        		close(wDone)
        	}()
        // 具体处理逻辑
        func Write2Mongo(rows []bson.M, collection *mongoDb.Collection, v *ExcelPre, s *Shuttle) error {
        	v.ProgressBar.SetCurrent(0)
        	incr := func(t time.Time, b *mpb.Bar, n int64) {
        		b.IncrInt64(n)
        		b.DecoratorEwmaUpdate(time.Since(t))
        	for _, row := range rows {
        		start := time.Now()
        		key := v.Prefixes + "@@" + row["_hid"].(string)
        		s.mu.Lock()
        		if Include(s.Hid, key) {
        			s.mu.Unlock()
        			incr(start, v.ProgressBar, 1)
        			continue
        		} else {
        			s.Hid = append(s.Hid, key)
        		var err error
        		var id primitive.ObjectID
        		if id, err = mongo.CreateDocs(collection, row); err != nil {
        			return errors.New(fmt.Sprintf("%s:\n%v", "mongo create docs err", err))
        		s.Append("m", id.Hex())
        		incr(start, v.ProgressBar, 1)
        	return nil

        5. 同步mysql

        因为同步mysql不是必要的,这里使用命令行输入进行判断:

        tools.Yellow("-> Whether to sync data to mysql? (y/n)")
        if !tools.Scan("aborted") {
        	return
        } else {
        	tools.Yellow("-> Syncing data to mysql...")
        	if err = write2mysql(); err != nil {
        		tools.Red("-> Failure:" + err.Error())
        	} else {
        		tools.Green("-> Success.")
        	}
        }

        连接mysql数据库,拿到当前monogodb的数据:

        func write2mysql() error {
        	if err := mysql.Conn(configs.C.Mysql.Dns); err != nil {
        		return err
        	}
        
        	d, err := mongo.GetCollectionAllData(configs.C.Mongo.Collection)
        	if err != nil {
        	err = Write2Mysql(d)
        	return err
        }

        创建表,直接拼sql就完事了:

        func CreateTable(tableName string, fields []string) error {
        	var err error
        	delSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableName)
        	err = Db.Exec(delSql).Error
        	if err != nil {
        		return err
        	}
        
        	s := "id bigint(20) NOT NULL PRIMARY KEY"
        	for _, field := range fields {
        		s += fmt.Sprintf(",%s varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL", field)
        	sql := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci", tableName, s)
        	err = Db.Exec(sql).Error
        	return nil
        }

        插入数据,bson.M本身就是一个map,转一下使用gorm分批插入数据,速度快一点:

        func InsertData(tableName string, fields []string, data []bson.M) error {
        	var err error
        	var maps []map[string]interface{}
        	for _, d := range data {
        		row := make(map[string]interface{})
        		for _, field := range fields {
        			row[field] = d[field]
        		}
        		if row != nil {
        			row["id"] = d["id"].(string)
        			maps = append(maps, row)
        	}
        
        	if len(maps) > 0 {
        		err = Db.Table(tableName).CreateInBatches(maps, 100).Error
        		if err != nil {
        			return err
        	return err
        }

        6. 总结

        做为golang新手,看了很多文档、文章,好似懂了,其实啥都不懂。

        声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。