基于Redis+Lua实现分布式限流组件

limit-spring-boot-starter

limit-spring-boot-starter是一个基于springboot starter机制,结合SPI 接口设计思想(内部集成:Redis+Lua实现限流算法(令牌桶,固定窗口,滑动窗口)以及限流回退默认实现),支持注解方式/配置文件方式接入限流,扩展方便,集成使用简单的分布式限流组件
开源地址:
https://gitee.com/javacoo/limit-spring-boot-starter

背景介绍

业务背景

1、随着业务的快速发展,对接的第三方合作机构越来越多,对外提供服务API访问量成倍增加,导致服务器压力也不断增加,而服务器资源是有限的,当请求量达到设计的极限时,如果不采取措施,轻则导致服务响应时间变长,重则可能造成整个系统瘫痪。

生产环境背景

1、账单日批量业务接口访问量暴增,特别是某个时间段
2、业务方调用接口的速度未知,QPS可能达到400/s,600/s,或者更高
3、对外服务API性能上限是 QPS 300/s
4、已经出现服务不可用,应用崩溃的事故

需求分析

1、鉴于业务方对接口的调用频率未知,而我方的接口服务有上限,为保证服务的可用性,业务层需要对接口调用方的流量进行限制—–接口限流。

2、尽量少改或者不改造已有功能:少侵入或者0侵入式开发。

3、扩展方便,集成简单,开发速率高,使用简单。

设计思路

主流思路

  • 在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。
  • 常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
  • 如果是单节点我们可以使用google为我们提供的guava包下的RateLimiter进行限流,它使用的是令牌桶算法,分布式场景下也可以使用网关进行限流,如Spring Clound Gateway,其实还有很多开源的限流框架如阿里的Sentinel,甚至我们可以利用redis+lua脚本自己来实现限流。

前面的话

在实际应用时也不要太纠结算法问题,因为一些限流算法实现是一样的只是描述不一样;具体使用哪种限流技术还是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫 :)

我的思路

组件基于springboot starter机制,结合SPI 接口设计思想(内部集成:Redis+Lua实现限流算法(令牌桶,固定窗口,滑动窗口)以及限流回退默认实现),支持注解方式/配置文件方式接入限流,主要分为以下部分:

  • 新建 springboot starter工程
  • 基于SPI思想设计扩展接口
  • redis+lua实现分布式限流
  • 支持接口添加注解方式限流
  • 支持配置文件方式限流
  • 支持限流回退

具体实现见 实施步骤 一节,由于实施步骤较多,故放在后面章节,我们先来看看如何集成及使用。

集成及使用

集成

<dependency>
   <groupId>com.javacoo</groupId>
   <artifactId>limit-spring-boot-starter</artifactId>
   <version>1.0.0</version>
</dependency>

使用

  • 配置文件接入限流 主要配置说明,详见 LimitConfig 限流配置,覆盖原则:有方法级独立配置则使用独立配置,否则使用全局配置,当配置了 limit.expression限流表达式则激活了配置文件接入限流,注解方式失效。
#全局配置
#限流表达式:拦截 com.javacoo.service.example.service 包及子包下所有方法
limit.expression=execution(* com.javacoo.service.example.service..*.*(..))
#全局限流回退实现名称
limit.limit-rule.fallback-impl=globalFallback

#方法级配置:针对getExampleInfo方法 配置独立的限流规则
#给定的时间范围 单位(秒)
limit.limit-method-map[getExampleInfo].period = 60
#一定时间内最多访问次数
limit.limit-method-map[getExampleInfo].count = 5
#限流类型
limit.limit-method-map[getExampleInfo].limitType = CUSTOMER
#降级策略
limit.limit-method-map[getExampleInfo].fallbackStrategy = FALLBACK
#当降级策略为:回退 时回退处理接口实现名称
limit.limit-method-map[getExampleInfo].fallbackImpl = getExampleInfoFallback
  • 使用注解方式接入限流:默认配置情况
 //60秒内,允许访问1次
  @Limit(period = 60,count = 1)
  public Optional<ExampleDto> getExampleInfo(String id) {
      AbstractAssert.isNotBlank(id, ErrorCodeConstants.SERVICE_GET_EXAMPLE_INFO_ID);
        ExampleDto exampleDto = new ExampleDto();
        exampleDto.setId("1");
        exampleDto.setData("正常数据");
        return Optional.ofNullable(exampleDto);
  }

限流效果:正常访问

2021-08-11 15:13:38.553  INFO 16452 --- [           main] c.j.l.c.i.redis.RedisLuaRateLimiter      : [限流交易请求],key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO],60秒内,已访问次数:1,60秒内,限制次数:1
2021-08-11 15:13:38.553  INFO 16452 --- [           main] c.j.l.c.handler.AbstractLimitHandler     : [限流交易请求],尝试获取执行权限成功,开始执行目标方法

限流效果:降级策略:FAIL_FAST

2021-08-11 15:14:14.413  INFO 16192 --- [           main] c.j.l.c.i.redis.RedisLuaRateLimiter      : [限流交易请求],key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO],60秒内,已访问次数:2,60秒内,限制次数:1
2021-08-11 15:14:14.414  INFO 16192 --- [           main] c.j.l.c.handler.AbstractLimitHandler     : [限流交易请求],尝试获取执行权限失败,服务降级处理,降级策略:FAIL_FAST
2021-08-11 15:14:14.416 ERROR 16192 --- [           main] c.j.l.c.handler.AbstractLimitHandler     : [限流服务执行异常]
com.javacoo.limit.client.exception.LimitException: 访问过于频繁,超出访问限制
...

使用注解接入限流:回退策略情况

//60秒内,允许访问1次,回退策略,指定回退处理类
@Limit(period = 60,count = 1,fallbackStrategy = FallbackStrategy.FALLBACK,fallbackImpl = "getExampleInfoFallback")
public Optional<ExampleDto> getExampleInfo(String id) {
      AbstractAssert.isNotBlank(id, ErrorCodeConstants.SERVICE_GET_EXAMPLE_INFO_ID);
        ExampleDto exampleDto = new ExampleDto();
        exampleDto.setId("1");
        exampleDto.setData("正常数据");
        return Optional.ofNullable(exampleDto);
  }

限流效果:正常访问

2021-08-11 15:22:44.613  INFO 16720 --- [           main] c.j.l.c.i.redis.RedisLuaRateLimiter      : [限流交易请求],key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO],60秒内,已访问次数:1,60秒内,限制次数:1
2021-08-11 15:22:44.614  INFO 16720 --- [           main] c.j.l.c.handler.AbstractLimitHandler     : [限流交易请求],尝试获取执行权限成功,开始执行目标方法
...
 限流效果:降级策略:FALLBACK
2021-08-11 15:23:09.497  INFO 8592 --- [           main] c.j.l.c.handler.AnnotationLimitHandler   : [AnnotationLimitHandler限流交易请求],尝试获取方法:com.javacoo.service.example.service.impl.ExampleServiceImpl.getExampleInfo,执行权限
2021-08-11 15:23:09.666  INFO 8592 --- [           main] c.j.l.c.i.redis.RedisLuaRateLimiter      : [限流交易请求],key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO],60秒内,已访问次数:2,60秒内,限制次数:1
2021-08-11 15:23:09.666  INFO 8592 --- [           main] c.j.l.c.handler.AbstractLimitHandler     : [限流交易请求],尝试获取执行权限失败,服务降级处理,降级策略:FALLBACK
2021-08-11 15:23:09.668  INFO 8592 --- [           main] c.j.s.e.fallback.GetExampleInfoFallback  : getExampleInfo方法降级处理
...

限流回退扩展实现

基于xkernel 提供的SPI机制(详见:
https://gitee.com/javacoo/xkernel
),扩展非常方便,大致步骤如下:
1、 实现限流回退接口:如
com.javacoo.service.example.fallback.GetExampleInfoFallback,com.javacoo.service.example.fallback.GlobalFallback

2、配置限流回退接口:

  • 在项目resource目录新建包->META-INF->services
  • 创建com.javacoo.limit.client.api.Fallback文件,文件内容:实现类的全局限定名,如:
globalFallback=com.javacoo.service.example.fallback.GlobalFallback
getExampleInfoFallback=com.javacoo.service.example.fallback.GetExampleInfoFallback
基于Redis+Lua实现分布式限流组件

 - 修改配置文件,添加如下内容:   ```properties   #全局配置   limit.limit-rule.fallback-impl=globalFallback   #方法级配置   limit.limit-method-map[getExampleInfo].fallbackImpl = getExampleInfoFallback   ```

全局实现

/**
 * 全局方法降级处理接口实现
 * <li></li>
 */
@Slf4j
public class GlobalFallback implements Fallback<Object> {
    /**
     * 服务降级处理
     * <li></li>
     * @return: R 返回对象
     */
    @Override
    public Object getFallback() {
        log.info("全局降级处理");
        return null;
    }
}

指定方法实现

/**
 * getExampleInfo方法降级处理接口实现
 * <li></li>
 */
@Slf4j
public class GetExampleInfoFallback implements Fallback<Optional<ExampleDto>> {
    /**
     * 服务降级处理
     * <li></li>
     *
     * @return: java.lang.Object 返回对象
     */
    @Override
    public Optional<ExampleDto> getFallback() {
        log.info("getExampleInfo方法降级处理");
        ExampleDto exampleDto = new ExampleDto();
        exampleDto.setData("请求过多,请稍后再试");
        return Optional.ofNullable(exampleDto);
    }
}

实施步骤

1,新建limit-spring-boot-starter工程

  • 工程结构
基于Redis+Lua实现分布式限流组件

limit.png

  • 类结构图
基于Redis+Lua实现分布式限流组件

RateLimiter.png

  • 项目结构
limit-spring-boot-starter
 └── src
    ├── main  
    │ ├── java  
    │ │   └── com.javacoo
    │ │   ├────── limit
    │ │   │         ├──────client
    │   │   │         │         ├── api
    │   │   │         │         │    ├── Fallback 服务降级回退处理接口
    │   │   │         │         │    └── RateLimiter 限流接口
    │   │   │         │         ├── annotation
    │   │   │         │         │    └── Limit 限流注解
    │   │   │         │         ├── config
    │   │   │         │         │    ├── LimitRule 限流规则配置
    │   │   │         │         │    └── LimitConfig 限流配置
    │   │   │         │         ├── enums
    │   │   │         │         │    ├── FallbackStrategy 降级策略
    │   │   │         │         │    └── LimitType 限流类型
    │   │   │         │         ├── exception
    │   │   │         │         │    └── LimitException 限流异常
    │   │   │         │         ├── util
    │   │   │         │         │    └── WebUtil 工具类
    │   │   │         │         ├── handler
    │   │   │         │         │    ├── AbstractLimitHandler 抽象限流处理器
    │   │   │         │         │    ├── AnnotationLimitHandler 限流注解处理器
    │   │   │         │         │    ├── ConfigLimitHandler 限流配置处理器
    │   │   │         │         │    └── LimitPointcutAdvisor 限流切面Advisor
    │   │   │         │         └── internal 接口内部实现
    │   │   │         │              ├── redis
    │   │   │         │              │    ├── DefaultFallback 服务降级回退处理接口默认实现
    │   │   │         │              └── redis
    │   │   │         │                   ├── LimitRedisConfig RedisTemplate配置类
    │   │   │         │                   ├── AbstractRateLimiter 抽象限流接口实现
    │   │   │         │                   ├── FixedWindowRateLimiter 固定窗口算法实现类
    │   │   │         │                   ├── LeakyBucketRateLimiter 漏桶算法实现类
    │   │   │         │                   ├── SlidingWindowRateLimiter 滑动窗口算法实现类
    │   │   │         │                   └── TokenBucketRateLimiter 令牌桶算法实现类
    │ │   │         └──────starter
    │   │   │                   ├── LimitAutoConfiguration 自动配置类
    │   │   │                   └── RateLimiterHolder 分布式限流接口对象持有者
    │ └── resource  
    │     ├── META-INF
    │       │      ├── spring.factories
    │       │      └── ext
    │     │           └── internal
    │     │                   ├── com.javacoo.limit.client.api.Fallback
    │     │                   └── com.javacoo.limit.client.api.RateLimiter
    │     └── script
    │             ├── FixedWindow.lua
    │             ├── LeakyBucket.lua
    │             ├── SlidingWindow.lua
    │             └── TokenBucket.lua
    └── test  测试

2,基于SPI思想设计扩展接口

限流接口->
com.javacoo.limit.client.api.RateLimiter

/**
 * 限流接口
 * <p>说明:</p>
 * <li></li>
 */
@Spi(LimitConfig.DEFAULT_IMPL)
public interface RateLimiter {
    /** 默认时间单位:秒 */
    TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    /** 默认限流时间范围:1 */
    int DEFAULT_PERIOD = 1;
    /** 默认限流数量: 10 */
    int DEFAULT_LIMIT_COUNT = 10;
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     * @param keys key
     * @param count 限制数量
     * @return boolean 是否成功
     */
    boolean tryAcquire(ImmutableList<String> keys ,int count);
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     * @param keys key
     * @param count 限制数量
     * @param period 时间周期
     * @return boolean 是否成功
     */
    boolean tryAcquire(ImmutableList<String> keys ,int count,int period);
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     * @param keys key
     * @param count 限制数量
     * @param period 时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    boolean tryAcquire(ImmutableList<String> keys ,int count,int period,TimeUnit timeUnit);
}

服务降级回退处理接口->
com.javacoo.limit.client.api.Fallback

/**
 * 服务降级回退处理接口
 * <li></li>
 */
@Spi(LimitConfig.DEFAULT_IMPL)
public interface Fallback<R> {
    /**
     * 服务降级处理
     * <li></li>
     * @return: R 返回对象
     */
    default R getFallback(){ return null;}
}

redis+lua实现分布式限流(内部扩展实现)

RedisTemplate配置类

/**
 * RedisTemplate配置类
 * <li></li>
 */
@Slf4j
@Configuration
public class LimitRedisConfig {
    private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
    @Bean
    public RedisTemplate<String, Serializable> limitRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        FastJsonRedisSerializer<Serializable> fastJsonRedisSerializer = new FastJsonRedisSerializer(Serializable.class);
        FastJsonConfig fastJsonConfig = new FastJsonConfig();
        fastJsonConfig.setDateFormat(DATA_FORMAT);
        fastJsonRedisSerializer.setFastJsonConfig(fastJsonConfig);
        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        // 设置键(key)的序列化采用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

抽象限流接口实现

/**
 * 抽象限流接口实现
 * <li></li>
 */
@Slf4j
public abstract class AbstractRateLimiter implements RateLimiter {
    /**
     * RedisScript
     */
    protected RedisScript<Long> redisLuaScript;
    /**
     * RedisTemplate
     */
    @Autowired
    protected RedisTemplate<String, Serializable> redisTemplate;
    @PostConstruct
    public void initLUA() {
        redisLuaScript = new DefaultRedisScript<>(buildLuaScript(), Long.class);
    }

    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param count 限制数量
     * @return boolean 是否成功
     */
    @Override
    public boolean tryAcquire(ImmutableList<String> keys , int count) {
        return tryAcquire(keys,count,RateLimiter.DEFAULT_PERIOD);
    }

    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param count  限制数量
     * @param period 时间周期
     * @return boolean 是否成功
     */
    @Override
    public boolean tryAcquire(ImmutableList<String> keys ,int count, int period) {
        return tryAcquire(keys,count,period,RateLimiter.DEFAULT_TIME_UNIT);
    }

    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param count    限制数量
     * @param period   时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    @Override
    public boolean tryAcquire(ImmutableList<String> keys ,int count, int period, TimeUnit timeUnit) {
        return acquire(keys,count,period,timeUnit);
    }
    /**
     * 构建lua脚本
     * <li></li>
     * @return: java.lang.String
     */
    protected abstract String buildLuaScript();
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     * @param keys key
     * @param limitCount    限制数量
     * @param limitPeriod   时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    protected abstract boolean acquire(ImmutableList<String> keys ,int limitCount, int limitPeriod, TimeUnit timeUnit);
    /**
     * 加载lua脚本
     * <li></li>
     * @param path:
     * @return: java.lang.String
     */
    protected String loadLuaScript(String path){
        try {
            PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(this.getClass().getClassLoader());
            Resource[] resource = resolver.getResources(path);
            String luaScriptContent = StreamUtils.copyToString(resource[0].getInputStream(), StandardCharsets.UTF_8);
            log.info("完成加载Lua脚本:{}",luaScriptContent);
            return luaScriptContent;
        } catch (IOException ioException) {
            ioException.printStackTrace();
            throw new LimitException("加载Lua脚本异常",ioException);
        }
    }
}

固定窗口算法实现类

**
 * 分布式限流 固定窗口算法实现类
 * <p>说明:</p>
 * <li>固定窗口,一般来说,如非时间紧迫,不建议选择这个方案,太过生硬。但是,为了能快速止损眼前的问题可以作为临时应急的方案</li>
 */
@Slf4j
public class FixedWindowRateLimiter extends AbstractRateLimiter {
    /**
     * 构建lua脚本
     * <li></li>
     * @return: java.lang.String
     */
    @Override
    protected String buildLuaScript(){
        return loadLuaScript( "classpath:script/FixedWindow.lua");
    }
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param limitCount    限制数量
     * @param limitPeriod   时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    @Override
    protected boolean acquire(ImmutableList<String> keys ,int limitCount, int limitPeriod, TimeUnit timeUnit){
        Long count = redisTemplate.execute(redisLuaScript, keys,limitCount,limitPeriod);
        log.info("[固定窗口限流交易请求],key:{},返回:{},{}秒内,限制次数:{}",keys,count,limitPeriod,limitCount);
        if (count != null && count.intValue() == 1) {
            return true;
        } else {
            return false;
        }
    }
}

FixedWindow.lua

--限流KEY
local key = KEYS[1]
--限流大小
local limit = tonumber(ARGV[1])
--时间周期
local period = tonumber(ARGV[2])

local current = tonumber(redis.call('get', key) or "0")
--如果超出限流大小
if current + 1 > limit then
    return 0
else
    redis.call("incr", key)
    if current == 1 then
        redis.call("expire", key,period)
    end
    return 1
end
  • 漏桶算法实现类

滑动窗口算法实现类

/**
 * 分布式限流 滑动窗口算法实现类
 * <p>说明:</p>
 * <li>滑动窗口。这个方案适用于对异常结果「高容忍」的场景,毕竟相比“两窗”少了一个缓冲区。但是,胜在实现简单</li>
 */
@Slf4j
public class SlidingWindowRateLimiter extends AbstractRateLimiter {
    /**
     * 构建lua脚本
     * <li></li>
     * @return: java.lang.String
     */
    @Override
    protected String buildLuaScript(){
        return loadLuaScript( "classpath:script/SlidingWindow.lua");
    }
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param limitCount    限制数量
     * @param limitPeriod   时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    @Override
    protected boolean acquire(ImmutableList<String> keys ,int limitCount, int limitPeriod, TimeUnit timeUnit){
        Long count = redisTemplate.execute(redisLuaScript, keys,"1",limitCount,limitPeriod);
        log.info("[滑动窗口限流交易请求],key:{},{}秒内,返回数量:{},{}秒内,限制次数:{}",keys,limitPeriod,count,limitPeriod,limitCount);
        if (count != null && count.intValue() > 0) {
            return true;
        } else {
            return false;
        }
    }
}

SlidingWindow.lua

local function addToQueue(x,time)
    local count=0
    for i=1,x,1 do
        redis.call('lpush',KEYS[1],time)
        count=count+1
    end
    return count
end
--返回
local result=0
--限流KEY
local key = KEYS[1]
--申请数
local applyCount = tonumber(ARGV[1])
--阀值数量
local limit = tonumber(ARGV[2])
--阀值时间
local period = tonumber(ARGV[3])
redis.replicate_commands()
local now = redis.call('time')[1]
redis.call('SET','now',now);
--当前时间
local current_time = now

local timeBase = redis.call('lindex',key, limit - applyCount)
if (timeBase == false) or (tonumber(current_time) - tonumber(timeBase) > period) then
    result = result + addToQueue(applyCount,tonumber(current_time))
end
if (timeBase ~= false) then
    redis.call('ltrim',key,0,limit)
end
return result

令牌桶算法实现类

/**
 * 分布式限流 令牌桶算法实现类
 * <p>说明:</p>
 * <li>令牌桶。当你需要尽可能的压榨程序的性能(此时桶的最大容量必然会大于等于程序的最大并发能力),并且所处的场景流量进入波动不是很大(不至于一瞬间取完令牌,压垮后端系统)</li>
 */
@Slf4j
public class TokenBucketRateLimiter extends AbstractRateLimiter {
    /**
     * 构建lua脚本
     * <li></li>
     */
    @Override
    protected String buildLuaScript(){
        return loadLuaScript( "classpath:script/TokenBucket.lua");
    }
    /**
     * 尝试获取
     * <p>说明:</p>
     * <li></li>
     *
     * @param keys key
     * @param limitCount    限制数量
     * @param limitPeriod   时间周期
     * @param timeUnit 时间周期单位
     * @return boolean 是否成功
     */
    @Override
    protected boolean acquire(ImmutableList<String> keys ,int limitCount, int limitPeriod, TimeUnit timeUnit){
        Long count = redisTemplate.execute(redisLuaScript, keys,limitCount,limitPeriod);
        log.info("[令牌桶限流交易请求],key:{},返回:{},{}秒内,限制次数:{}",keys,count,limitPeriod,limitCount);
        if (count != null && count.intValue() == 1) {
            return true;
        } else {
            return false;
        }
    }
}

TokenBucket.lua

--利用redis的hash结构,存储key所对应令牌桶的上次获取时间和上次获取后桶中令牌数量
local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token_num')
local last_time = ratelimit_info[1]
local current_token_num = tonumber(ratelimit_info[2])

redis.replicate_commands()
local now = redis.call('time')[1]
redis.call('SET','now',now);

--tonumber是将value转换为数字,此步是取出桶中最大令牌数、生成令牌的速率(每秒生成多少个)、当前时间
local max_token_num = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
--local current_time = tonumber(ARGV[3])
local current_time = now
--reverse_time 即多少毫秒生成一个令牌
local reverse_time = 1000/token_rate

--如果current_token_num不存在则说明令牌桶首次获取或已过期,即说明它是满的
if current_token_num == nil then
  current_token_num = max_token_num
  last_time = current_time
else
  --计算出距上次获取已过去多长时间
  local past_time = current_time-last_time
  --在这一段时间内可产生多少令牌
  local reverse_token_num = math.floor(past_time/reverse_time)
  current_token_num = current_token_num +reverse_token_num
  last_time = reverse_time * reverse_token_num + last_time
  if current_token_num > max_token_num then
    current_token_num = max_token_num
  end
end

local result = 0
if(current_token_num > 0) then
  result = 1
  current_token_num = current_token_num - 1
end

--将最新得出的令牌获取时间和当前令牌数量进行存储,并设置过期时间
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token_num',current_token_num)
redis.call('pexpire',KEYS[1],math.ceil(reverse_time*(max_token_num - current_token_num)+(current_time-last_time)))

return result

4,支持接口添加注解方式限流

自定义限流注解

/**
 * 自定义限流注解
 * <p>说明:</p>
 * <li></li>
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Limit {
    /**
     * 名字
     */
    String name() default "";

    /**
     * key
     */
    String key() default "";

    /**
     * Key的前缀
     */
    String prefix() default "";

    /**
     * 给定的时间范围 单位(秒)
     */
    int period();

    /**
     * 一定时间内最多访问次数
     */
    int count();

    /**
     * 限流的类型(用户自定义key 或者 请求ip)
     */
    LimitType limitType() default LimitType.CUSTOMER;

    /**
     * 降级策略:默认快速失败
     */
    FallbackStrategy fallbackStrategy() default FallbackStrategy.FAIL_FAST;

    /**
     * 当降级策略为:回退 时回退处理接口实现名称
     */
    String fallbackImpl() default "";
}

限流注解处理器

/**
 * 限流注解处理器
 * <li>基于aspectj</li>
 */
@Slf4j
@Aspect
@Component
public class AnnotationLimitHandler extends AbstractLimitHandler<Object>{

    @Around("@annotation(limit)")
    public Object around(ProceedingJoinPoint joinPoint, Limit limit) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        // 获取被拦截的方法
        Method method = signature.getMethod();
        // 获取被拦截的类名
        String className = signature.getDeclaringType().getName();
        // 获取被拦截的方法名
        String methodName = method.getName();
        String defaultKey = StringUtils.join(className,".", methodName);
        log.info("[AnnotationLimitHandler限流交易请求],尝试获取方法:{},执行权限", defaultKey);
        //组装限流规则
        LimitRule limitRule = LimitRule.builder()
            .defaultKey(defaultKey)
            .count(limit.count())
            .key(limit.key())
            .limitType(limit.limitType())
            .period(limit.period())
            .prefix(limit.prefix())
            .fallbackStrategy(limit.fallbackStrategy())
            .fallbackImpl(limit.fallbackImpl())
            .build();
        return handle(limitRule, () -> {
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                log.error("[AnnotationLimitHandler限流交易请求],方法执行异常",throwable);
            }
            return null;
        });
    }
}

5,支持配置文件方式限流

接口限流配置

/**
 * 接口限流配置
 */
@ConfigurationProperties(prefix = LimitConfig.PREFIX)
public class LimitConfig {
    /** 前缀 */
    public static final String PREFIX = "limit";
    /** 前缀 */
    public static final String EXP = "expression";
    /** limit是否可用,默认值*/
    public static final String ENABLED = "enabled";
    /** 默认实现,默认值*/
    public static final String DEFAULT_IMPL= "default";
    /** PointCut表达式,默认值*/
    public static final String DEFAULT_EXP= "";
    /** limit是否可用*/
    private String enabled = ENABLED;
    /**实现*/
    private String impl = DEFAULT_IMPL;
    /**PointCut表达式*/
    private String expression = DEFAULT_EXP;
    /**全局限流规则配置*/
    @NestedConfigurationProperty
    private LimitRule limitRule = new LimitRule().init();
    /**特殊接口限流规则配置Map*/
    private Map<String, LimitRule> limitMethodMap = new HashMap<>(5);

    public String getEnabled() {
        return enabled;
    }

    public void setEnabled(String enabled) {
        this.enabled = enabled;
    }

    public String getImpl() {
        return impl;
    }

    public void setImpl(String impl) {
        this.impl = impl;
    }

    public String getExpression() {
        return expression;
    }

    public void setExpression(String expression) {
        this.expression = expression;
    }

    public LimitRule getLimitRule() {
        return limitRule;
    }

    public void setLimitRule(LimitRule limitRule) {
        this.limitRule = limitRule;
    }

    public Map<String, LimitRule> getLimitMethodMap() {
        return limitMethodMap;
    }

    public void setLimitMethodMap(Map<String, LimitRule> limitMethodMap) {
        this.limitMethodMap = limitMethodMap;
    }
}

限流规则配置

/**
 * 限流规则配置
 * <li>单个接口方法限流规则配置</li>
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class LimitRule {
    /** 给定的时间范围 单位(秒),默认值*/
    public static final int DEFAULT_PERIOD = 1;
    /** 一定时间内最多访问次数,默认值*/
    public static final int DEFAULT_COUNT = 50;
    /**
     * defaultKey
     */
    private String defaultKey = "";
    /**
     * key
     */
    private String key;

    /**
     * Key的前缀
     */
    private String prefix;
    /**
     * 给定的时间范围 单位(秒)
     */
    private int period;

    /**
     * 一定时间内最多访问次数
     */
    private int count;
    /**
     * 限流类型
     */
    private LimitType limitType;
    /**
     * 降级策略:默认快速失败
     */
    private FallbackStrategy fallbackStrategy;
    /**
     * 当降级策略为:回退 时回退处理接口实现名称
     */
    private String fallbackImpl;
    /**
     * 初始化
     * <li></li>
     * @return: void
     */
    public LimitRule init(){
        this.setPeriod(DEFAULT_PERIOD);
        this.setCount(DEFAULT_COUNT);
        this.setLimitType(LimitType.CUSTOMER);
        this.setFallbackStrategy(FallbackStrategy.FAIL_FAST);
        this.setFallbackImpl(LimitConfig.DEFAULT_IMPL);
        return this;
    }
}

限流配置处理器

/**
 * 限流配置处理器
 * <li>处理配置方式限流</li>
 */
@Slf4j
public class ConfigLimitHandler extends AbstractLimitHandler<Object> implements MethodInterceptor {
    @Nullable
    @Override
    public Object invoke(@Nonnull MethodInvocation methodInvocation) throws Throwable {
        // 获取被拦截的类名
        String className = methodInvocation.getClass().getName();
        // 获取被拦截的方法名
        String methodName = methodInvocation.getMethod().getName();
        String defaultKey = StringUtils.join(className,".", methodName);
        //获取配置文件中的限流规则
        LimitRule limitRule = lockConfig.getLimitRule();
        //如果需要特殊处理
        if(lockConfig.getLimitMethodMap().containsKey(methodName)){
            limitRule = lockConfig.getLimitMethodMap().get(methodName);
        }
        limitRule.setDefaultKey(defaultKey);
        log.info("[ConfigLimitHandler限流交易请求],尝试获取方法:{},执行权限", defaultKey);
        return handle(limitRule, () -> {
            try {
                return methodInvocation.proceed();
            } catch (Throwable throwable) {
                log.error("[ConfigLimitHandler限流交易请求],方法执行异常",throwable);
            }
            return null;
        });
    }
}

限流切面Advisor

/**
 * 限流切面Advisor
 * <li></li>
 */
@Slf4j
public class LimitPointcutAdvisor extends AbstractBeanFactoryPointcutAdvisor {
    @Autowired
    private LimitConfig lockConfig;
    @Override
    public Pointcut getPointcut() {
        AspectJExpressionPointcut adapterPointcut = new AspectJExpressionPointcut();
        //从配置文件中获取PointCut表达式
        adapterPointcut.setExpression(lockConfig.getExpression());
        return adapterPointcut;
    }
}

6,支持限流回退

  • 服务降级回退处理接口默认实现
/**
 * 服务降级回退处理接口默认实现
 * <li></li>
 */
public class DefaultFallback implements Fallback<Object> {
    /**
     * 服务降级处理
     * <li></li>
     * @return: java.lang.Object 返回对象
     */
    @Override
    public Object getFallback() {
        return null;
    }
}

7,其他辅助类

  • 降级策略枚举类
/**
 * 降级策略
 * <li></li>
 */
public enum FallbackStrategy {
    /**
     * 快速失败
     */
    FAIL_FAST,
    /**
     * 回退
     */
    FALLBACK;
}
  • 限流类型枚举类
/**
 * 限流类型
 * <p>说明:</p>
 * <li></li>
 */
public enum LimitType {
    /**
     * 自定义key
     */
    CUSTOMER,

    /**
     * 请求者IP
     */
    IP;
}
  • 限流异常类
/**
 * 限流异常
 * <li></li>
 */
public class LimitException extends RuntimeException{
    /**错误码*/
    protected String code;
    public LimitException() {
    }

    public LimitException(Throwable ex) {
        super(ex);
    }
    public LimitException(String message) {
        super(message);
    }
    public LimitException(String code, String message) {
        super(message);
        this.code = code;
    }
    public LimitException(String message, Throwable ex) {
        super(message, ex);
    }
}
  • 抽象限流处理器
/**
 * 抽象限流处理器
 * <p>说明:</p>
 * <li></li>
 */
@Slf4j
public abstract class AbstractLimitHandler<R> {
    @Autowired
    protected LimitConfig lockConfig;
    @Autowired
    private RateLimiter rateLimiter;

    /**
     * 限流处理
     * <li></li>
     * @param limitRule: 限流规则
     * @param function: 结果提供函数
     * @return: R
     */
    protected R handle(LimitRule limitRule, Supplier<R> function){
        //规则整理
        handleLimitRule(limitRule);

        ImmutableList<String> keys = ImmutableList.of(StringUtils.join(limitRule.getPrefix(), limitRule.getKey()));
        try {
            if (rateLimiter.tryAcquire(keys,limitRule.getCount(), limitRule.getPeriod())) {
                log.info("[限流交易请求],尝试获取执行权限成功,开始执行目标方法");
                return function.get();
            }else{
                log.info("[限流交易请求],尝试获取执行权限失败,服务降级处理,降级策略:{}",limitRule.getFallbackStrategy());
                if(FallbackStrategy.FAIL_FAST.equals(limitRule.getFallbackStrategy())){
                    throw new LimitException("访问过于频繁,超出访问限制");
                }
                //获取回退处理接口实现:如果为空则使用默认实现
                Fallback<R> fallback = ExtensionLoader.getExtensionLoader(Fallback.class).getExtension(limitRule.getFallbackImpl());
                fallback = fallback != null ? fallback : ExtensionLoader.getExtensionLoader(Fallback.class).getDefaultExtension();
                return fallback.getFallback();
            }
        } catch (Throwable e) {
            log.error("[限流服务执行异常]",e);
            if (e instanceof LimitException) {
                throw e;
            }
            throw new LimitException("限流服务执行异常",e);
        }
    }
  
    /**
     * 限流规则整理
     * <li></li>
     * @param limitRule: 限流规则
     * @return: void
     */
    private void handleLimitRule(LimitRule limitRule){
        //限流规则整理:如果为空则使用全局配置
        limitRule.setKey(StringUtils.isBlank(limitRule.getKey()) ? lockConfig.getLimitRule().getKey() : limitRule.getKey());
        limitRule.setCount(limitRule.getCount() == 0 ? lockConfig.getLimitRule().getCount() : limitRule.getCount());
        limitRule.setFallbackImpl(StringUtils.isBlank(limitRule.getFallbackImpl()) ? lockConfig.getLimitRule().getFallbackImpl() : limitRule.getFallbackImpl());
        limitRule.setFallbackStrategy(limitRule.getFallbackStrategy() == null ? lockConfig.getLimitRule().getFallbackStrategy() : limitRule.getFallbackStrategy());
        limitRule.setLimitType(limitRule.getLimitType() == null ? lockConfig.getLimitRule().getLimitType() : limitRule.getLimitType());
        limitRule.setPeriod(limitRule.getPeriod() == 0 ? lockConfig.getLimitRule().getPeriod() : limitRule.getPeriod());
        limitRule.setPrefix(StringUtils.isBlank(limitRule.getPrefix()) ? lockConfig.getLimitRule().getPrefix() : limitRule.getPrefix());

        String key = limitRule.getKey();
        //根据限流类型获取不同的key ,如果不传我们会以方法名作为key
        switch (limitRule.getLimitType()) {
            case IP:
                key = WebUtil.getIpAddress();
                break;
            case CUSTOMER:
                key = StringUtils.isBlank(key) ? StringUtils.upperCase(limitRule.getDefaultKey()) : key;
                break;
            default:
                key = StringUtils.upperCase(limitRule.getDefaultKey());
        }
        limitRule.setKey(key);
    }
}
  • 工具类
/**
 * 工具类
 * <li></li>
 */
public class WebUtil {
    private static final String UNKNOWN = "unknown";

    /**
     * 获取IP
     * <p>说明:</p>
     * <li></li>
     */
    public static String getIpAddress() {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String ip = request.getHeader("x-forwarded-for");
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
        }
        return ip;
    }
}
  • 自动配置类
/**
 * 自动配置类
 * <li></li>
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(value = LimitConfig.class)
@ConditionalOnClass(RateLimiter.class)
@ConditionalOnProperty(prefix = LimitConfig.PREFIX, value = LimitConfig.ENABLED, matchIfMissing = true)
public class LimitAutoConfiguration {
  @Autowired
    private LimitConfig lockConfig;

  @Bean
  @ConditionalOnMissingBean(RateLimiter.class)
  public RateLimiter createRateLimiter() {
        log.info("初始化分布式限流对象,实现类名称:{}",lockConfig.getImpl());
        RateLimiterHolder.rateLimiter = ExtensionLoader.getExtensionLoader(RateLimiter.class).getExtension(lockConfig.getImpl());
        log.info("初始化分布式限流对象成功,实现类:{}", RateLimiterHolder.rateLimiter);
        return RateLimiterHolder.rateLimiter;
  }
    
    @Bean
    @ConditionalOnProperty(prefix = LimitConfig.PREFIX, value = LimitConfig.EXP)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public LimitPointcutAdvisor adapterServiceAdvisor() {
        LimitPointcutAdvisor advisor = new LimitPointcutAdvisor();
        advisor.setAdviceBeanName("limitPointcutAdvisor");
        advisor.setAdvice(createConfigLimitHandler());
        advisor.setOrder(Ordered.HIGHEST_PRECEDENCE);
        return advisor;
    }
    @Bean
    public ConfigLimitHandler createConfigLimitHandler() {
        return new ConfigLimitHandler();
    }

    @Bean
    @ConditionalOnMissingBean(LimitPointcutAdvisor.class)
    public AnnotationLimitHandler createAnnotationLimitHandler() {
        return new AnnotationLimitHandler();
    }
}
  • 限流接口对象持有者
/**
 * 限流接口对象持有者
 * <p>说明:</p>
 * <li></li>
 */
public class RateLimiterHolder {
    /** 限流对象*/
    static RateLimiter rateLimiter;

    public static Optional<RateLimiter> getRateLimiter() {
        return Optional.ofNullable(rateLimiter);
    }
}

8,资源文件

  • spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.javacoo.limit.client.internal.redis.LimitRedisConfig,
  com.javacoo.limit.starter.LimitAutoConfiguration

com.javacoo.limit.client.api.Fallback

default=com.javacoo.limit.client.internal.fallback.DefaultFallback

com.javacoo.limit.client.api.RateLimiter

default=com.javacoo.limit.client.internal.redis.TokenBucketRateLimiter
fixedWindow=com.javacoo.limit.client.internal.redis.FixedWindowRateLimiter
slidingWindow=com.javacoo.limit.client.internal.redis.SlidingWindowRateLimiter

问题及局限性

问题

  • 紧经过小规模生产验证,虽满足业务需求,但是还不算成熟,仍需更多测试验证。
  • 仅供学习参考,如需用于生产,请谨慎,并多测试。
  • 推荐使用Spring Clound Gateway,Sentinel等专业流量防护组件 局限性
  • 限流组件保证了高可用,牺牲了性能,增加了一层 IO 环节的开销,单机限流在本地,分布式限流还要通过网络协议。
  • 限流组件保证了高可用,牺牲了一致性,在大流量的情况下,请求的处理会出现延迟的情况,这种场景便无法保证强一致性。特殊情况下,还无法保证最终一致性,部分请求直接被抛弃。
  • 限流组件拥有流控权,若限流组件挂了,会引起雪崩效应,导致请求与业务的大批量失败。
  • 引入限流组件,增加系统的复杂程度,开发难度增加,限流中间件的设计本身就是一个复杂的体系,需要综合业务与技术去思考与权衡,同时还要确保限流组件本身的高可用与性能,极大增加工作量,甚至需要一个团队去专门开发。

内容出处:,

声明:本网站所收集的部分公开资料来源于互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。文章链接:http://www.yixao.com/share/27208.html

发表评论

登录后才能评论