PowerJob UseCacheLock工作流程源码剖析
作者:codecraft
这篇文章主要为大家介绍了PowerJob UseCacheLock工作流程源码剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
序
本文主要研究一下PowerJob的UseCacheLock
UseCacheLock
tech/powerjob/server/core/lock/UseCacheLock.java
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface UseCacheLock { String type(); String key(); int concurrencyLevel(); }
UseCacheLock注解定义了type、key、concurrencyLevel属性
UseCacheLockAspect
tech/powerjob/server/core/lock/UseCacheLockAspect.java
@Slf4j @Aspect @Component @Order(1) @RequiredArgsConstructor public class UseCacheLockAspect { private final MonitorService monitorService; private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap(); private static final long SLOW_THRESHOLD = 100; @Around(value = "@annotation(useCacheLock))") public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable { Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> { int concurrencyLevel = useCacheLock.concurrencyLevel(); log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel); return CacheBuilder.newBuilder() .initialCapacity(300000) .maximumSize(500000) .concurrencyLevel(concurrencyLevel) .expireAfterWrite(30, TimeUnit.MINUTES) .build(); }); final Method method = AOPUtils.parseMethod(point); Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L); final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new); long start = System.currentTimeMillis(); reentrantLock.lockInterruptibly(); try { long timeCost = System.currentTimeMillis() - start; if (timeCost > SLOW_THRESHOLD) { final SlowLockEvent slowLockEvent = new SlowLockEvent() .setType(SlowLockEvent.Type.LOCAL) .setLockType(useCacheLock.type()) .setLockKey(String.valueOf(key)) .setCallerService(method.getDeclaringClass().getSimpleName()) .setCallerMethod(method.getName()) .setCost(timeCost); monitorService.monitor(slowLockEvent); log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost, key, JSON.toJSONString(point.getArgs())); } return point.proceed(); } finally { reentrantLock.unlock(); } } }
UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;Cache采用的guava的Cache,其initialCapacity为300000,maximumSize为500000,expireAfterWrite为30分钟;Cache的key为lock key,value为ReentrantLock;其execute方法主要是先执行reentrantLock.lockInterruptibly(),然后执行point.proceed(),最后reentrantLock.unlock();执行point.proceed()之前还判断了一下加锁耗时,若超过SLOW_THRESHOLD(100ms)则通过monitorService.monitor上报SlowLockEvent
示例
@UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024) public void redispatchAsync(Long instanceId, int originStatus) { // 将状态重置为等待派发 instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date()); }
key支持SpEl
小结
PowerJob的UseCacheLock注解定义了type、key、concurrencyLevel属性;UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;而Cache的key为lock key,value为ReentrantLock,最后是通过reentrantLock.lockInterruptibly()加锁。
以上就是PowerJob UseCacheLock工作流程源码剖析的详细内容,更多关于PowerJob UseCacheLock的资料请关注脚本之家其它相关文章!