限流是指在系统中对请求或操作的流量进行控制,以防止系统过载或资源耗尽。其主要目的是保证系统的稳定性和可靠性。

原理解析

限流的基本原理是对一定时间窗口内的请求数量进行限制,从而避免系统在高并发场景下被请求淹没。常见的限流策略包括:

  1. 固定窗口限流:在固定的时间窗口内进行限流,例如每分钟允许1000个请求。如果请求数超过限制,则拒绝或延迟处理。
  2. 滑动窗口限流:将时间窗口划分为多个小时间片,每个时间片内有一个子计数器,滑动窗口通过这些子计数器进行限流。
  3. 漏桶算法:系统以固定速率处理请求,超出速率的请求会被缓存或丢弃。
  4. 令牌桶算法:系统以固定速率向桶中添加令牌,每次请求需要消耗一个令牌,当令牌耗尽时请求被拒绝或延迟,可以应对突发的流量高峰。

限流算法的实现方式

固定窗口

package main

import (
	"fmt"
	"sync"
	"time"
)

type FixedWindowLimiter struct {
	windowSize  time.Duration // 窗口大小
	maxRequests int           // 最大请求数
	requests    int           // 当前窗口内的请求数
	lastReset   int64         // 上次窗口重置时间(秒级时间戳)
	resetMutex  sync.Mutex    // 重置锁
}

func NewFixedWindowLimiter(windowSize time.Duration, maxRequests int) *FixedWindowLimiter {
	return &FixedWindowLimiter{
		windowSize:  windowSize,
		maxRequests: maxRequests,
		lastReset:   time.Now().Unix(),
	}
}

func (limiter *FixedWindowLimiter) AllowRequest() bool {
	limiter.resetMutex.Lock()
	defer limiter.resetMutex.Unlock()

	// 检查是否需要重置窗口
	if time.Now().Unix()-limiter.lastReset >= int64(limiter.windowSize.Seconds()) {
		limiter.requests = 0
		limiter.lastReset = time.Now().Unix()
	}

	// 检查请求数是否超过阈值
	if limiter.requests >= limiter.maxRequests {
		return false
	}

	limiter.requests++
	return true
}

func main() {
	limiter := NewFixedWindowLimiter(1*time.Second, 3) // 每秒最多允许3个请求
	for i := 0; i < 15; i++ {
		now := time.Now().Format("15:04:05")
		if limiter.AllowRequest() {
			fmt.Println(now + " 请求通过")
		} else {
			fmt.Println(now + " 请求被限流")
		}
		time.Sleep(100 * time.Millisecond)
	}
}

缺点

  • 限流不够平滑
  • 临界问题:假设限流阀值为5个请求,单位时间窗口是1s,如果我们在单位时间内的前0.8-1s和1-1.2s,分别并发5个请求。虽然都没有超过阀值,但是如果算0.8-1.2s,则并发数高达10,已经超过单位时间1s不超过5

滑动窗口

package main

import (
   "fmt"
   "sync"
   "time"
)

type SlidingWindowLimiter struct {
   windowSize   time.Duration // 窗口大小
   maxRequests  int           // 最大请求数
   requests     []time.Time   // 窗口内的请求时间
   requestsLock sync.Mutex    // 请求锁
}

func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
   return &SlidingWindowLimiter{
      windowSize:  windowSize,
      maxRequests: maxRequests,
      requests:    make([]time.Time, 0),
   }
}

func (limiter *SlidingWindowLimiter) AllowRequest() bool {
   limiter.requestsLock.Lock()
   defer limiter.requestsLock.Unlock()

   // 移除过期的请求
   currentTime := time.Now()
   for len(limiter.requests) > 0 && currentTime.Sub(limiter.requests[0]) > limiter.windowSize {
      limiter.requests = limiter.requests[1:]
   }

   // 检查请求数是否超过阈值
   if len(limiter.requests) >= limiter.maxRequests {
      return false
   }

   limiter.requests = append(limiter.requests, currentTime)
   return true
}

func main() {
   limiter := NewSlidingWindowLimiter(500*time.Millisecond, 2) // 每秒最多允许4个请求
   for i := 0; i < 15; i++ {
      now := time.Now().Format("15:04:05")
      if limiter.AllowRequest() {
         fmt.Println(now + " 请求通过")
      } else {
         fmt.Println(now + " 请求被限流")
      }
      time.Sleep(100 * time.Millisecond)
   }
}

漏桶

水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会超过桶可接纳的容量时直接溢出。

漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。

削峰:有大量流量进入时,会发生溢出,从而限流保护服务可用

缓冲:不至于直接请求到服务器, 缓冲压力

消费速度固定 因为计算性能固定

package main

import (
   "fmt"
   "time"
)

type LeakyBucket struct {
   rate       float64 // 漏桶速率,单位请求数/秒
   capacity   int     // 漏桶容量,最多可存储请求数
   water      int     // 当前水量,表示当前漏桶中的请求数
   lastLeakMs int64   // 上次漏水的时间戳,单位秒
}

func NewLeakyBucket(rate float64, capacity int) *LeakyBucket {
   return &LeakyBucket{
      rate:       rate,
      capacity:   capacity,
      water:      0,
      lastLeakMs: time.Now().Unix(),
   }
}

func (lb *LeakyBucket) Allow() bool {
   now := time.Now().Unix()
   elapsed := now - lb.lastLeakMs

   // 漏水,根据时间间隔计算漏掉的水量
   leakAmount := int(float64(elapsed) / 1000 * lb.rate)
   if leakAmount > 0 {
      if leakAmount > lb.water {
         lb.water = 0
      } else {
         lb.water -= leakAmount
      }
   }

   // 判断当前水量是否超过容量
   if lb.water > lb.capacity {
      lb.water-- // 如果超过容量,减去刚刚增加的水量
      return false
   }

   // 增加水量
   lb.water++

   lb.lastLeakMs = now
   return true
}

func main() {
   // 创建一个漏桶,速率为每秒处理3个请求,容量为4个请求
   leakyBucket := NewLeakyBucket(3, 4)

   // 模拟请求
   for i := 1; i <= 15; i++ {
      now := time.Now().Format("15:04:05")
      if leakyBucket.Allow() {
         fmt.Printf(now+"  第 %d 个请求通过\n", i)
      } else {
         fmt.Printf(now+"  第 %d 个请求被限流\n", i)
      }
      time.Sleep(200 * time.Millisecond) // 模拟请求间隔
   }
}

令牌桶

令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。
令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。

令牌桶限流大致的规则如下:
(1)进水口按照某个速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。

比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。

package main

import (
   "fmt"
   "sync"
   "time"
)

// TokenBucket 表示一个令牌桶。
type TokenBucket struct {
   rate       float64    // 令牌添加到桶中的速率。
   capacity   float64    // 桶的最大容量。
   tokens     float64    // 当前桶中的令牌数量。
   lastUpdate time.Time  // 上次更新令牌数量的时间。
   mu         sync.Mutex // 互斥锁,确保线程安全。
}

// NewTokenBucket 创建一个新的令牌桶,给定令牌添加速率和桶的容量。
func NewTokenBucket(rate float64, capacity float64) *TokenBucket {
   return &TokenBucket{
      rate:       rate,
      capacity:   capacity,
      tokens:     capacity, // 初始时,桶是满的。
      lastUpdate: time.Now(),
   }
}

// Allow 检查是否可以从桶中移除一个令牌。如果可以,它移除一个令牌并返回 true。
// 如果不可以,它返回 false。
func (tb *TokenBucket) Allow() bool {
   tb.mu.Lock()
   defer tb.mu.Unlock()

   // 根据经过的时间计算要添加的令牌数量。
   now := time.Now()
   elapsed := now.Sub(tb.lastUpdate).Seconds() 
   tokensToAdd := elapsed * tb.rate            

   tb.tokens += tokensToAdd
   if tb.tokens > tb.capacity {
      tb.tokens = tb.capacity // 确保令牌数量不超过桶的容量。
   }

   if tb.tokens >= 1.0 {
      tb.tokens--
      tb.lastUpdate = now
      return true
   }

   return false
}

func main() {
   tokenBucket := NewTokenBucket(2.0, 3.0)

   for i := 1; i <= 10; i++ {
      now := time.Now().Format("15:04:05")
      if tokenBucket.Allow() {
         fmt.Printf(now+"  第 %d 个请求通过\n", i)
      } else { // 如果不能移除一个令牌,请求被拒绝。
         fmt.Printf(now+"  第 %d 个请求被限流\n", i)
      }
      time.Sleep(200 * time.Millisecond)
   }
}
算法优点缺点适合场景
固定窗口– 简单直观,易于实现
– 适用于稳定的流量控制
– 易于实现速率控制
– 无法应对短时间内的突发流量
– 流量不均匀时可能导致突发流量
– 稳定的流量控制,不需要保证请求均匀分布的场景
滑动窗口– 平滑处理突发流量
– 颗粒度更小,可以提供更精确的限流控制
– 实现相对复杂
– 需要维护滑动窗口的状态
– 存在较高的内存消耗
– 需要平滑处理突发流量的场景
漏桶算法– 平滑处理突发流量
– 可以固定输出速率,有效防止过载
– 对突发流量的处理不够灵活
– 无法应对流量波动的情况
– 需要固定输出速率的场景
– 避免流量的突然增大对系统的冲击的场景
令牌桶– 平滑处理突发流量
– 可以动态调整限流规则
– 能适应不同时间段的流量变化
– 实现相对复杂
– 需要维护令牌桶的状态
– 需要动态调整限流规则的场景
– 需要平滑处理突发流量的场景

分布式限流

分布式限流是在多个服务实例或多个节点上对资源访问进行限制的过程。在微服务架构或大规模分布式系统中,由于服务可能被部署在多个服务器或者多个容器上,因此需要一种机制来协调和控制整个系统的流量,以防止系统过载和确保服务的高可用性。

  • Redis: 用于实现分布式计数器和令牌桶。
  • Google Guava RateLimiter: 在单个JVM级别提供限流功能,可用于微服务架构中的单个服务实例。
  • Nginx: 通过配置可以实现较为简单的限流。
  • Zuul、Spring Cloud Gateway: 这些API网关提供了限流功能。

不同层的限流

  • 应用层:在代码中直接实现限流逻辑。可以使用各种算法,如令牌桶、漏桶等
  • 服务器层:Web服务器配置(比如 nginx)、API 网关
  • 网络层:在负载均衡器上实现限流,可以在流量进入服务器前就进行控制

相关链接:

https://www.cnblogs.com/niumoo/p/16007224.html

https://juejin.cn/post/7383982728734097445