17611538698
webmaster@21cto.com

使用Go语言,25秒读取16GB文件

资讯 0 3308 2021-06-17 04:59:28
<p><strong>导读:当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些文件通常位于本地磁盘中。</strong></p> <p><img alt="" src="https://www.21cto.com/uploads/images/b79paxxicaau0cn.jpeg" style="width: 100%; height: 100%;" /><br /> 我们将使用Go语言,从一个大小为16GB的.txt或.log文件中提取日志。让我们开始编码&hellip;&hellip;</p> <p>首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go os.File。</p> <pre> <code class="language-python">f, err := os.Open(fileName)  if err != nil {    fmt.Println("cannot able to read the file", err)    return  } // UPDATE: close after checking error defer file.Close()  //Do not forget to close the file</code></pre> <p><br /> 打开文件后,我们有以下两个选项可以选择:</p> <p>逐行读取文件,这有助于减少内存紧张,但需要更多的时间。一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。</p> <p>由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。</p> <p>但你猜怎么着,还有第三种选择。瞧&hellip;&hellip;相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。</p> <pre> <code class="language-java">r := bufio.NewReader(f) for { buf := make([]byte,4*1024) //the chunk size n, err := r.Read(buf) //loading chunk into buffer    buf = buf[:n] if n == 0 {          if err != nil {        fmt.Println(err)        break      }      if err == io.EOF {        break      }      return err   } }</code></pre> <p><br /> 一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:</p> <p>&nbsp;</p> <pre> <code class="language-java">//sync pools to reuse the memory and decrease the preassure on Garbage Collector linesPool := sync.Pool{New: func() interface{} {         lines := make([]byte, 500*1024)         return lines }} stringPool := sync.Pool{New: func() interface{} {           lines := ""           return lines }} slicePool := sync.Pool{New: func() interface{} {            lines := make([]string, 100)            return lines }} r := bufio.NewReader(f) var wg sync.WaitGroup //wait group to keep track off all threads for {            buf := linesPool.Get().([]byte)      n, err := r.Read(buf)      buf = buf[:n] if n == 0 {         if err != nil {             fmt.Println(err)             break         }         if err == io.EOF {             break         }         return err      } nextUntillNewline, err := r.ReadBytes('\n')//read entire line            if err != io.EOF {          buf = append(buf, nextUntillNewline...)      }            wg.Add(1)      go func() {                 //process each chunk concurrently         //start -&gt; log start time, end -&gt; log end time                  ProcessChunk(buf, &amp;linesPool, &amp;stringPool, &amp;slicePool,     start, end) wg.Done()            }() } wg.Wait() }</code></pre> <p><br /> 上面的代码,引入了两个优化点:</p> <p>sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。</p> <p>现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。</p> <p>2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n<br /> 我们将根据命令行提供的时间戳提取日志。</p> <pre> <code class="language-java">func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) { //another wait group to process every chunk further                                    var wg2 sync.WaitGroup logs := stringPool.Get().(string) logs = string(chunk) linesPool.Put(chunk) //put back the chunk in pool //split the string by "\n", so that we have slice of logs       logsSlice := strings.Split(logs, "\n") stringPool.Put(logs) //put back the string pool chunkSize := 100 //process the bunch of 100 logs in thread n := len(logsSlice) noOfThread := n / chunkSize if n%chunkSize != 0 { //check for overflow           noOfThread++       } length := len(logsSlice) //traverse the chunk      for i := 0; i &lt; length; i += chunkSize {                    wg2.Add(1) //process each chunk in saperate chunk          go func(s int, e int) {             for i:= s; i&lt;e;i++{                text := logsSlice[i] if len(text) == 0 {                   continue                }                         logParts := strings.SplitN(text, ",", 2)             logCreationTimeString := logParts[0]             logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString) if err != nil {                  fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)                  return             } // check if log's timestamp is inbetween our desired period           if logCreationTime.After(start) &amp;&amp; logCreationTime.Before(end) {                        fmt.Println(text)            }         }         textSlice = nil         wg2.Done()            }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))    //passing the indexes for processing }      wg2.Wait() //wait for a chunk to finish    logsSlice = nil }</code></pre> <p><br /> 对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。</p> <p>完整的代码示例如下:</p> <pre> <code class="language-java">func main() {  s := time.Now()  args := os.Args[1:]  if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"   fmt.Println("Please give proper command line arguments")   return  }  startTimeArg := args[1]  finishTimeArg := args[3]  fileName := args[5]  file, err := os.Open(fileName)    if err != nil {   fmt.Println("cannot able to read the file", err)   return  }    defer file.Close() //close after checking err    queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)  if err != nil {   fmt.Println("Could not able to parse the start time", startTimeArg)   return  }  queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)  if err != nil {   fmt.Println("Could not able to parse the finish time", finishTimeArg)   return  }  filestat, err := file.Stat()  if err != nil {   fmt.Println("Could not able to get the file stat")   return  }  fileSize := filestat.Size()  offset := fileSize - 1  lastLineSize := 0  for {   b := make([]byte, 1)   n, err := file.ReadAt(b, offset)   if err != nil {    fmt.Println("Error reading file ", err)    break   }   char := string(b[0])   if char == "\n" {    break   }   offset--   lastLineSize += n  }  lastLine := make([]byte, lastLineSize)  _, err = file.ReadAt(lastLine, offset+1)  if err != nil {   fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)   return  }  logSlice := strings.SplitN(string(lastLine), ",", 2)  logCreationTimeString := logSlice[0]  lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)  if err != nil {   fmt.Println("can not able to parse time : ", err)  }  if lastLogCreationTime.After(queryStartTime) &amp;&amp; lastLogCreationTime.Before(queryFinishTime) {   Process(file, queryStartTime, queryFinishTime)  }  fmt.Println("\nTime taken - ", time.Since(s)) } func Process(f *os.File, start time.Time, end time.Time) error {  linesPool := sync.Pool{New: func() interface{} {   lines := make([]byte, 250*1024)   return lines  }}  stringPool := sync.Pool{New: func() interface{} {   lines := ""   return lines  }}  r := bufio.NewReader(f)  var wg sync.WaitGroup  for {   buf := linesPool.Get().([]byte)   n, err := r.Read(buf)   buf = buf[:n]   if n == 0 {    if err != nil {     fmt.Println(err)     break    }    if err == io.EOF {     break    }    return err   }   nextUntillNewline, err := r.ReadBytes('\n')   if err != io.EOF {    buf = append(buf, nextUntillNewline...)   }   wg.Add(1)   go func() {    ProcessChunk(buf, &amp;linesPool, &amp;stringPool, start, end)    wg.Done()   }()  }  wg.Wait()  return nil } func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {  var wg2 sync.WaitGroup  logs := stringPool.Get().(string)  logs = string(chunk)  linesPool.Put(chunk)  logsSlice := strings.Split(logs, "\n")  stringPool.Put(logs)  chunkSize := 300  n := len(logsSlice)  noOfThread := n / chunkSize  if n%chunkSize != 0 {   noOfThread++  }  for i := 0; i &lt; (noOfThread); i++ {   wg2.Add(1)   go func(s int, e int) {    defer wg2.Done() //to avaoid deadlocks    for i := s; i &lt; e; i++ {     text := logsSlice[i]     if len(text) == 0 {      continue     }     logSlice := strings.SplitN(text, ",", 2)     logCreationTimeString := logSlice[0]     logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)     if err != nil {      fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)      return     }     if logCreationTime.After(start) &amp;&amp; logCreationTime.Before(end) {      //fmt.Println(text)     }    }       }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))  }  wg2.Wait()  logsSlice = nil } </code></pre> <p>&nbsp;</p> <p>&nbsp;</p> <blockquote> <p>来源:分布式实验室</p> <p>原文:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2</p> </blockquote>

评论