使用注解形式实现分布式锁
用到的包
implementation 'cn.hutool:hutool-all:4.5.0'
implementation 'org.redisson:redisson-spring-boot-starter:3.10.6'
新建注解类
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {
/**
* redis锁 名字
*/
String lockName() default "";
/**
* redis锁 key 支持spel表达式
*/
String key() default "";
/**
* 过期秒数,默认为5毫秒
*
* @return 轮询锁的时间
*/
int expire() default 5000;
/**
* 超时时间单位
*
* @return 秒
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
新建Aspect
@Aspect
@Component
public class RedisLockAspect {
@Autowired
private RedissonClient redissonClient;
private static final String REDISSON_LOCK_PREFIX = "redisson_lock:";
@Around("@annotation(redisLock)")
public Object around(ProceedingJoinPoint joinPoint, RedisLock redisLock) throws Throwable {
String spel = redisLock.key();
String lockName = redisLock.lockName();
RLock rLock = redissonClient.getLock(getRedisKey(joinPoint,lockName,spel));
rLock.lock(redisLock.expire(),redisLock.timeUnit());
Object result;
try {
//执行方法
result = joinPoint.proceed();
} finally {
rLock.unlock();
}
return result;
}
/**
* 将spel表达式转换为字符串
* @param joinPoint 切点
* @return redisKey
*/
private String getRedisKey(ProceedingJoinPoint joinPoint,String lockName,String spel) {
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method targetMethod = methodSignature.getMethod();
Object target = joinPoint.getTarget();
Object[] arguments = joinPoint.getArgs();
return REDISSON_LOCK_PREFIX + lockName + StrUtil.COLON + SpelUtil.parse(target,spel, targetMethod, arguments);
}
}
序列化用到的一些类
被redisson使用
public class FstCodec extends BaseCodec {
private final FSTConfiguration config;
public FstCodec() {
config = new FSTSerializer().getConfig();
}
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBufInputStream in = new ByteBufInputStream(buf);
FSTObjectInput inputStream = config.getObjectInput(in);
try {
return inputStream.readObject();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
};
private final Encoder encoder = new Encoder() {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
ByteBufOutputStream os = new ByteBufOutputStream(out);
FSTObjectOutput oos = config.getObjectOutput(os);
try {
oos.writeObject(in);
oos.flush();
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
}
};
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
@Override
public ClassLoader getClassLoader() {
if (config.getClassLoader() != null) {
return config.getClassLoader();
}
return super.getClassLoader();
}
}
使用fst 进行reids的序列化
public class FstRedisSerializer implements RedisSerializer<Object> {
private static final byte[] EMPTY_ARRAY = new byte[0];
@Override
@SneakyThrows
public byte[] serialize(Object o) {
if (o == null) {
return EMPTY_ARRAY;
}
return new FSTSerializer().getConfig().asByteArray(o);
}
@Override
@SneakyThrows
public Object deserialize(byte[] bytes) {
if (isEmpty(bytes)) {
return null;
}
return new FSTSerializer().getConfig().asObject(bytes);
}
private static boolean isEmpty(@Nullable byte[] data) {
return (data == null || data.length == 0);
}
}
使用fts进行序列化
public class FSTSerializer {
static class FSTDefaultStreamCoderFactory implements FSTConfiguration.StreamCoderFactory {
Field chBufField;
Field ascStringCacheField;
{
try {
chBufField = FSTStreamDecoder.class.getDeclaredField("chBufS");
ascStringCacheField = FSTStreamDecoder.class.getDeclaredField("ascStringCache");
} catch (Exception e) {
throw new IllegalStateException(e);
}
ascStringCacheField.setAccessible(true);
chBufField.setAccessible(true);
}
private FSTConfiguration fstConfiguration;
FSTDefaultStreamCoderFactory(FSTConfiguration fstConfiguration) {
this.fstConfiguration = fstConfiguration;
}
@Override
public FSTEncoder createStreamEncoder() {
return new FSTStreamEncoder(fstConfiguration);
}
@Override
public FSTDecoder createStreamDecoder() {
return new FSTStreamDecoder(fstConfiguration) {
@Override
public String readStringUTF() throws IOException {
try {
String res = super.readStringUTF();
chBufField.set(this, null);
return res;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public String readStringAsc() throws IOException {
try {
String res = super.readStringAsc();
ascStringCacheField.set(this, null);
return res;
} catch (Exception e) {
throw new IOException(e);
}
}
};
}
static ThreadLocal input = new ThreadLocal();
static ThreadLocal output = new ThreadLocal();
@Override
public ThreadLocal getInput() {
return input;
}
@Override
public ThreadLocal getOutput() {
return output;
}
}
private static class InstanceHolder {
private static final FSTConfiguration INSTANCE = FSTConfiguration.createDefaultConfiguration();
static {
INSTANCE.setStreamCoderFactory(new FSTDefaultStreamCoderFactory(INSTANCE));
}
}
public FSTConfiguration getConfig() {
return InstanceHolder.INSTANCE;
}
}
spel用到的util
public class SpelUtil {
/**
* 支持 #p0 参数索引的表达式解析
* @param rootObject 根对象,method 所在的对象
* @param spel 表达式
* @param method ,目标方法
* @param args 方法入参
* @return 解析后的字符串
*/
public static String parse(Object rootObject,String spel, Method method, Object[] args) {
if (StrUtil.isBlank(spel)) {
return StrUtil.EMPTY;
}
//获取被拦截方法参数名列表(使用Spring支持类库)
LocalVariableTableParameterNameDiscoverer u =
new LocalVariableTableParameterNameDiscoverer();
String[] paraNameArr = u.getParameterNames(method);
if (ArrayUtil.isEmpty(paraNameArr)) {
return spel;
}
//使用SPEL进行key的解析
ExpressionParser parser = new SpelExpressionParser();
//SPEL上下文
StandardEvaluationContext context = new MethodBasedEvaluationContext(rootObject,method,args,u);
//把方法参数放入SPEL上下文中
for (int i = 0; i < paraNameArr.length; i++) {
context.setVariable(paraNameArr[i], args[i]);
}
return parser.parseExpression(spel).getValue(context, String.class);
}
}
配置文件
spring.redis.redisson.config=classpath:redisson.yml
redisson.yml内容
# 单节点设置
singleServerConfig:
address: redis://192.168.1.157:6379
database: 1
password: null
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
clientName: null
# 发布和订阅连接的最小空闲连接数 默认1
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小 默认50
subscriptionConnectionPoolSize: 10
# 单个连接最大订阅数量 默认5
subscriptionsPerConnection: 5
# 最小空闲连接数 默认32,现在暂时不需要那么多的线程
connectionMinimumIdleSize: 4
# connectionPoolSize 默认64,现在暂时不需要那么多的线程
connectionPoolSize: 20
# 这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。
threads: 0
# 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。
nettyThreads: 0
codec:
#此处注意是FstCodec所在的包
class: com.xxx.api.redisson.serializer.FstCodec
transportMode: NIO
使用演示
@RedisLock(lockName = "getExampleListByCodeLock")
public List<ExampleDto> getExampleListByCode(String code) {
List<ExampleEntity> list = exampleMapper.selectListByCode(code);
return list.stream().map(exampleEntity ->
BeanUtil.copyByClass(exampleEntity, ExampleDto.class)).collect(Collectors.toList());
}