免费资源网 – https://freexyz.cn/

目录
  • 处理流程对比
    • net/http处理流程
    • fasthttp处理流程
  • fasthttp为什么快
    • 底层实现
      • 简单案例
      • workerPool结构
      • 启动服务
      • Start开启协程池
      • 接收连接
      • 获取 workerChan
      • 处理连接
    • 总结

      处理流程对比

      在进行了解fasthttp底层代码实现之前,我们先对两者处理请求的方式进行一个回顾和对比,了解完两者的基本的情况之后,再对fasthttp的实现最进一步分析。

      net/http处理流程

      在小许文章《图文讲透Golang标准库 net/http实现原理 — 服务端》中讲的比较详细了,这里再把大致流程整理以下,整体流程如下:

      浅析Go中fasthttp与net/http的性能对比及应用

      1. 将路由和对应的handler注册到一个 map 中,用做后续键值路由匹配

      2. 注册完之后就是开启循环监听连接,每获取到一个连接就会创建一个 Goroutine进行处理

      3. 在创建好的 Goroutine 里面会循环的等待接收请求数据,然后根据请求的地址去键值路由map中匹配对应的handler

      4. 执行匹配到的处理器handler

      net/http 的实现是一个连接新建一个 goroutine,如果在连接数非常多的时候,,每个连接都会创建一个 Goroutine 就会给系统带来一定的压力。这也就造成了 net/http在处理高并发时的瓶颈。

      每次来了一个连接,都要实例化一个连接对象,这谁受得了,哈哈

      fasthttp处理流程

      再看看fasthttp处理请求的流程:

      浅析Go中fasthttp与net/http的性能对比及应用

      1. 启动监听

      2. 循环监听端口获取连接,建立workerPool

      3. 循环尝试获取连接 net.Conn,先会去 ready 队列里获取 workerChan,获取不到就会去对象池获取

      4. 将获取到的的连接net.Conn 发送到 workerChan 的 channel 中

      5. 开启一个 Goroutine 一直循环获取 workerChan 这个 channel 中的数据

      6. 获取到channel中的net.Conn之后就会对请求进行处理

      workerChan 其实就是一个连接处理对象,这个对象里面有一个 channel 用来传递连接;每个 workerChan 在后台都会有一个 Goroutine 循环获取 channel 中的连接,然后进行处理。

      workerChan是在workerPool临时对象分别存取

      fasthttp为什么快

      fasthttp的优化主要有以下几个点:

      • 连接复用,如slice中有可复用的workerChan就从ready这个slice中获取,没有可复用的就在workerChanPool创建一个,万一池子满了(默认是 256 * 1024个)就报错。

      • 对于内存复用,就是大量使用了sync.Pool(你知道的,sync.Pool复用对象有啥好处),有人统计过,用了整整30个sync.Pool,context、request对象、header、response对象都用了sync.Pool ….

      • 利用unsafe.Pointer指针进行[]byte 和 string 转换,避免[]byte到string转换时带来的内存分配和拷贝带来的消耗 。

      知道了fasthttp为什么快,接下来我们看下它是如何处理监听处理请求的,在哪些地方用到了这些特性。

      底层实现

      简单案例

      import (
          "github.com/buaazp/fasthttprouter"
          "github.com/valyala/fasthttp"
          "log"
      )
      
      func main() {
          //创建路由
          r := fasthttprouter.New()
          r.GET("/", Index)
          if err := fasthttp.ListenAndServe(":8083", r.Handler); err != nil {
              log.Fatalf("ListenAndServe fatal: %s", err)
          }
      
      }
      func Index(ctx *fasthttp.RequestCtx) {
          ctx.WriteString("hello xiaou code!")
      }

      这个案例同样是几样代码就启动了一个服务。

      创建路由、为不同的路由执行关联不同的处理函数handler,接着跟net/http一样调用 ListenAndServe 函数进行启动服务监听,等待请求进行处理。

      workerPool结构

      workerpool 对象表示 连接处理 工作池,这样可以控制连接建立后的处理方式,而不是像标准库 net/http 一样,对每个请求连接都启动一个 goroutine 处理, 内部的 ready 字段存储空闲的 workerChan 对象,workerChanPool 字段表示管理 workerChan 的对象池。

      workerPool结构体如下:

      type workerPool struct {
          //匹配请求对应的handler
          WorkerFunc ServeHandler
          //最大同时处理的请求数
          MaxWorkersCount int
          
          LogAllErrors bool
          //最大空闲工作时间
          MaxIdleWorkerDuration time.Duration
      
          Logger Logger
          //互斥锁
          lock         sync.Mutex
          //work数量
          workersCount int
          mustStop     bool
          // 空闲的 workerChan
          ready []*workerChan
          //是否关闭workerPool
          stopCh chan struct{}
          //sync.Pool  workerChan 的对象池
          workerChanPool sync.Pool
      
          connState func(net.Conn, ConnState)
      }

      WorkerFunc :这个属性挺重要的,因为给它赋值的是Server.serveConn

      ready:存储了空闲的workerChan

      workerChanPool:是workerChan 的对象池,在sync.Pool中存取临时对象,可减少内存分配

      启动服务

      ListenAndServe是启动服务监听的入口,内部的调用过程如下:

      浅析Go中fasthttp与net/http的性能对比及应用

      Server.Serve

      Serve方法为来自给监听到的连接提供处理服务,直到超过了最大限制(256 * 1024)才会报错。

      func (s *Server) Serve(ln net.Listener) error {
          //最大连接处理数
          maxWorkersCount := s.getConcurrency()
      
          s.mu.Lock()
          s.ln = append(s.ln, ln)
          if s.done == nil {
              s.done = make(chan struct{})
          }
          if s.concurrencyCh == nil {
              s.concurrencyCh = make(chan struct{}, maxWorkersCount)
          }
          s.mu.Unlock()
          //workerPool进行初始化
          wp := &workerPool{
              WorkerFunc:            s.serveConn,
              MaxWorkersCount:       maxWorkersCount,
              LogAllErrors:          s.LogAllErrors,
              MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
              Logger:                s.logger(),
              connState:             s.setState,
          }
          //开启协程,处理协程池的清理工作
          wp.Start()
          atomic.AddInt32(&s.open, 1)
          defer atomic.AddInt32(&s.open, -1)
      
          for {
              // 阻塞等待,获取连接net.Conn
              if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
                  ...
                  return err
              }
              s.setState(c, StateNew)
              atomic.AddInt32(&s.open, 1)
              //处理获取到的连接net.Conn
              if !wp.Serve(c) {
                  //未能处理,说明已达到最大worker限制
                  ...
              }
              c = nil
          }
      }

      从上面的注释中我们可以看出 Server 方法主要做了以下几件事:

      1. 初始化 worker Pool,并启动

      2. net.Listener循环接收请求

      3. 将接收到的请求交给workerChan 处理

      注意:这里如果超过了设定的最大连接数(默认是 256 * 1024个)就直接报错了

      Start开启协程池

      workerPool进行初始化之后接着就调用Start开启,这里主要是指定sync.Pool变量workerChanPool的创建函数。

      接着开启一个协程,该Goroutine的目的是进行定时清理 workerPool 中的 ready 中保存的空闲 workerChan,清理频率为每 10s 启动一次。

      清理规则是使用二进制搜索算法找出最近可以清理的工作者的索引

      func (wp *workerPool) Start() {
          //wp的关闭channel是否为空
          if wp.stopCh != nil {
              return
          }
          wp.stopCh = make(chan struct{})
          stopCh := wp.stopCh
          //指定workerChanPool的创建函数
          wp.workerChanPool.New = func() interface{} {
              return &workerChan{
                  ch: make(chan net.Conn, workerChanCap),
              }
          }
          //开启协程
          go func() {
              var scratch []*workerChan
              for {
                  //清理空闲超时的 workerChan
                  wp.clean(&scratch)
                  select {
                  case <-stopCh:
                      return
                  default:
                      // 间隔10 s
                      time.Sleep(wp.getMaxIdleWorkerDuration())
                  }
              }
          }()
      }

      开启一个清理Goroutine的目的是为了避免在流量高峰创建了大量协程,之后不再使用,造成协程浪费。

      清理流程是在wp.clean()方法中实现的。

      接收连接

      acceptConn函数通过调用net.Listener的accept方法去接受连接,这里获取连接的方式跟net/http调用的其实都是一样的。

      func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
          for {
              c, err := ln.Accept()
              if err != nil {
                  //err判断
                  ...
              }
              //校验是否net.TCPConn连接
             // 校验每个ip对应的连接数
              if s.MaxConnsPerIP > 0 {
                  pic := wrapPerIPConn(s, c)
                  if pic == nil {
                      ...
                      continue
                  }
                  c = pic
              }
              return c, nil
          }
      }

      获取 workerChan

      func (wp *workerPool) Serve(c net.Conn) bool {
          //获取 workerChan 
          ch := wp.getCh()
          if ch == nil {
              return false
          }
          //将连接放到channel中
          ch.ch <- c
          //返回true
          return true
      }

      这里调用的getCh()函数实现了获取workerChan,获取到之后将之前接受的连接net.Conn放到workerChan结构体的channel通道中。

      我们看下workerChan这个结构体

      type workerChan struct {
          lastUseTime time.Time
          ch          chan net.Conn
      }

      lastUseTime:最后一次被使用的时间,这个值在进行清理workerChan的时候是会用到的

      ch:用来传递获取到的连接net.Conn,获取到连接时接收,处理请求时获取

      getCh方法:

      func (wp *workerPool) getCh() *workerChan {
          var ch *workerChan
          createWorker := false
      
          wp.lock.Lock()
          //从ready队列中拿workerChan
          ready := wp.ready
          n := len(ready) - 1
          if n < 0 {
              if wp.workersCount < wp.MaxWorkersCount {
                  createWorker = true
                  wp.workersCount++
              }
          } else {
              //ready队列不为空,从队尾拿workerChan
              ch = ready[n]
              //队尾置为nil
              ready[n] = nil
              //重新将ready赋值给wp.ready
              wp.ready = ready[:n]
          }
          wp.lock.Unlock()
          //ready中获取不到workerChan,则从对象池中新建一个
          if ch == nil {
              if !createWorker {
                  return nil
              }
              vch := wp.workerChanPool.Get()
              ch = vch.(*workerChan)
              //开启一个goroutine执行
              go func() {
                  //处理ch中channel中的数据
                  wp.workerFunc(ch)
                  //处理完后将workerChan放回对象池
                  wp.workerChanPool.Put(vch)
              }()
          }
          return ch
      }

      浅析Go中fasthttp与net/http的性能对比及应用

      getCh()方法的目的就是获取workerChan,流程如下:

      • 先会去 ready 空闲队列中获取 workerChan

      • ready 获取不到则从对象池中创建一个新的 workerChan

      • 并启动 Goroutine 用来处理 channel 中的数据

      workPool中的ready是一个FILO的栈,每次从队尾取出workChan

      处理连接

      func (wp *workerPool) workerFunc(ch *workerChan) {
          var c net.Conn
      
          var err error
          for c = range ch.ch {
              //channel的值是nil,退出
              if c == nil {
                  break
              }
              //执行请求,并处理
              if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
                  ...
              }
              ...
              //将当前workerChan放入ready队列
              if !wp.release(ch) {
                  break
              }
          }
      
          wp.lock.Lock()
          wp.workersCount--
          wp.lock.Unlock()
      }

      执行流程

      • 先遍历workerChan的channel,看是否有连接net.Conn

      • 获取到连接之后就执行WorkerFunc 函数处理请求

      • 请求处理完之后将当前workerChan放入ready队列

      WorkerFunc 函数实际上是 Server 的 serveConn 方法

      一开始开代码的时候我还没发现呢,细看了之后在Server.Serve()启动服务时将Server.serveConn()方法赋值给了workerPool的WorkerFunc()。

      浅析Go中fasthttp与net/http的性能对比及应用

      要想了解实现的朋友可以搜下这方面的代码

      func (s *Server) ServeConn(c net.Conn) error {
          ...
          err := s.serveConn(c)
          ...
      }

      里面的代码会比较多,不过里面的流程就是是获取到请求的参数,找到对应的 handler 进行请求处理,然后返回 响应给客户端。

      这里的实现代码可以看到context、request对象的sync.Pool实现,这里就不一一贴出来了。

      总结

      fasthttp和net/http在实现上还是有较大区别,通过对实现原理的分析,知道了fasthttp速度快是利用了大量sync.Pool对象复用 、[]byte 和 string利用万能指针unsafe.Pointer进行转换等优化技巧。

      如果你的业务需要支撑较高的 QPS 并且保持一致的低延迟时间,那么采用 fasthttp 是一个较好的选择。不过net/http兼容性更高,在多数情况下反而是更好的选择!

      以上就是浅析Go中fasthttp与net/http的性能对比及应用的详细内容,更多关于Go fasthttp的资料请关注其它相关文章!

      免费资源网 – https://freexyz.cn/

      声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。