header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
//如果为真,通过拦截
if (response.isAllowed()) {
return chain.filter(exchange);
}
//否则设置http码为429,too many request
exchange.getResponse().setStatusCode(config.getStatusCode());
return exchange.getResponse().setComplete();
}));
};
}
}
分析:
1.加载KeyResolver,从配置文件中加载,此处我配置了hostAddrKeyResolver,即根据host地址来进行限流。如果为空,使用默认的PrincipalNameKeyResolver
2.加载RateLimiter,默认使用RedisRateLimiter。
3.执行RedisRateLimiter的isAllowed方法,得到response,如果isAllowed为true则通过拦截,否则返回429(isAllowed方法具体实现下文描述)。
## HostAddrKeyResolver:
@Slf4j
public class HostAddrKeyResolver implements KeyResolver {
@Override
public Mono resolve(ServerWebExchange exchange) {
log.info("HostAddrKeyResolver 限流");
return Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}
}
在启动类中注入bean
@Bean
public HostAddrKeyResolver hostAddrKeyResolver() {
return new HostAddrKeyResolver();
}
## RedisRateLimiter:
@Override
@SuppressWarnings("unchecked")
public Mono isAllowed(String routeId, String id) {
//判断是否初始化
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
//获取配置
Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
if (routeConfig == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
//令牌桶填充速率
int replenishRate = routeConfig.getReplenishRate();
//令牌桶可容纳令牌数
int burstCapacity = routeConfig.getBurstCapacity();
try {
//获取redis的key,执行lua脚本时传入
List keys = getKeys(id);
//获取参数,执行lua脚本时传入
List scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
Flux> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
/*
* We don't want a hard dependency on Redis to allow traffic. Make sure to set
* an alert so you know if this is happening too much. Stripe's observed
* failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
@NotNull
public HashMap getHeaders(Config config, Long tokensLeft) {
HashMap headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(config.getReplenishRate()));
headers.put(this.burstCapacityHeader, String.valueOf(config.getBurstCapacity()));
return headers;
}
static List getKeys(String id) {
// use {} around keys to use Redis Key hash tags
// this allows for using redis cluster
// Make a unique key per user.
String prefix = "request_rate_limiter.{" + id;
//令牌桶剩余令牌数
String tokenKey = prefix + "}.tokens";
//令牌桶最后填充令牌时间
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
分析:
1.判断是否初始化,加载配置,获取令牌填充速率和令牌桶大小
2.根据路由id组合成两个redis中的key值,传入lua脚本
request_rate_limiter.{id}.tokens 令牌桶剩余令牌数
request_rate_limiter.{id}.timestamp 令牌桶最后填充令牌时间
3.把令牌填充速率,令牌桶大小,当前时间(单位:秒),消耗令牌数(默认为1)组合传入lua脚本
4.执行lua脚本
5.flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) 这个是对执行lua脚本过程中发生异常的处理,它会忽略异常,返回令牌。这样就能跟redis解耦,不对它强依赖。
该实现核心主要体现在lua脚本上,它使用的是令牌桶算法
详见spring-cloud-gateway-core下的request_rate_limiter.lua
## 获取剩余令牌数的redis key
local tokens_key = KEYS[1]
## 获取最后一次填充令牌的时间
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
## 令牌填充速率
local rate = tonumber(ARGV[1])
## 令牌桶大小
local capacity = tonumber(ARGV[2])
## 当前秒数
local now = tonumber(ARGV[3])
## 消耗令牌数,默认1
local requested = tonumber(ARGV[4])
## 计算令牌桶需要填充的时间
local fill_time = capacity/rate
## 计算key的存活时间
local ttl = math.floor(fill_time2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
## 获取剩余的令牌数
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
## 获取令牌最后填充时间
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
## 计算得到剩余的令牌数
local filled_tokens = math.min(capacity, last_tokens+(deltarate))
## 大于请求消耗令牌 allowed 设为true
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }
看完上述内容,你们掌握SpringCloud中怎么实现gateway限流的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
标题名称:SpringCloud中怎么实现gateway限流
文章URL:http://wjwzjz.com/article/ijecep.html