场景:调用上游的接口,上游不同接口有不同的调用频率限制,我们全量同步有上百万条数据,如何快速同步
例子:
上游有接口A,B,C
A接口频率 600次/min
B接口频率 200次/min
C接口频率 1000次/min
如果超过频率,调用会报错
写了一个本地做好限流的方法,要求所有的记录都要调用且 减少调用报错的问题
因为借助了Redistemplate 暂不考虑 原子性问题
@Slf4j
public class RateLimitUtil {
RedisUtils redisUtils;
// 频次:limit/timeSecond
//频率
private Integer limit;
//时间单位秒
private Integer timeSecond;
public RateLimitUtil(RedisUtils redisUtils, Integer limit, Integer timeSecond) {
this.redisUtils = redisUtils;
this.limit = limit;
this.timeSecond = timeSecond;
}
/**
* 非分布式锁
* @param clazz
* @param method
*/
public synchronized void lock(Class clazz, String method){
String key = clazz.toString()+"."+method;
String valueStr ;
int value;
while (true){
valueStr = redisUtils.getValue(key);
if(valueStr == null){
boolean putFlag = redisUtils.setNx(key, 1);
if(putFlag){
redisUtils.expireSeconds(key,timeSecond);
break ;
}
continue;
}
value = Integer.valueOf(valueStr);
if(value >= limit){
long expire = redisUtils.getExpire(key);
sleepSecond(expire+1);
if(expire>0){
log.info("{} sleep:{} 秒",key,expire);
}
continue;
}
redisUtils.incr(key,1L);
break;
}
}
public void sleepSecond(long n){
try {
Thread.sleep(n*1000);
} catch (InterruptedException e) {
e.printstacktrace();
}
}
public void sleep(long n){
try {
Thread.sleep(n);
} catch (InterruptedException e) {
e.printstacktrace();
}
}
/**
* 分布式锁
* @param clazz
* @param method
*/
public void lockdistributed(Class clazz,String method){
String lockKey = clazz+"."+method+"_lock";
try {
while (true){
boolean result = redisUtils.setNx(lockKey, Thread.currentThread().getId());
if(result){
redisUtils.expireSeconds(lockKey,timeSecond);
lock(clazz,method);
break;
}else {
sleepSecond(10);
}
}
}catch (Exception e){
}finally {
String value = redisUtils.getValue(lockKey);
if(value!= null){
if(String.valueOf(Thread.currentThread().getId()).equals(value)){
redisUtils.del(lockKey);
}
}
}
}
}
使用例子:
@Test
public void testLimit1(){
RateLimitUtil rateLimitUtil = new RateLimitUtil(redisUtils,10,6);
Random random = new Random();
for (int i=0;i<100;i++){
try {
Thread.sleep(random.nextInt(200));
}catch (Exception e){
}
long time = System.currentTimeMillis()/1000;
rateLimitUtil.lockdistributed(this.getClass(),"testLimit");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。