java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > PowerJob LockService

PowerJob LockService方法工作流程源码解读

作者:codecraft

这篇文章主要为大家介绍了PowerJob LockService方法工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的LockService

LockService

tech/powerjob/server/extension/LockService.java

public interface LockService {
    /**
     * 上锁(获取锁),立即返回,不会阻塞等待锁
     * @param name 锁名称
     * @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
     * @return true -> 获取到锁,false -> 未获取到锁
     */
    boolean tryLock(String name, long maxLockTime);
    /**
     * 释放锁
     * @param name 锁名称
     */
    void unlock(String name);
}
LockService接口定义了tryLock、unlock方法

DatabaseLockService

tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java

@Slf4j
@Service
public class DatabaseLockService implements LockService {
    private final String ownerIp;
    private final OmsLockRepository omsLockRepository;
    @Autowired
    public DatabaseLockService(OmsLockRepository omsLockRepository) {
        this.ownerIp = NetUtils.getLocalHost();
        this.omsLockRepository = omsLockRepository;
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            int num = omsLockRepository.deleteByOwnerIP(ownerIp);
            log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
        }));
    }
    @Override
    public boolean tryLock(String name, long maxLockTime) {
        OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
        try {
            omsLockRepository.saveAndFlush(newLock);
            return true;
        } catch (DataIntegrityViolationException ignore) {
        } catch (Exception e) {
            log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
        }
        OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
        long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
        // 锁超时,强制释放锁并重新尝试获取
        if (lockedMillions > omsLockDO.getMaxLockTime()) {
            log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
            unlock(name);
            return tryLock(name, maxLockTime);
        }
        return false;
    }
    @Override
    public void unlock(String name) {
        try {
            CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
        }catch (Exception e) {
            log.error("[DatabaseLockService] unlock {} failed.", name, e);
        }
    }
}
DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName

NetUtils.getLocalHost

tech/powerjob/common/utils/NetUtils.java

public static String getLocalHost() {
        if (HOST_ADDRESS != null) {
            return HOST_ADDRESS;
        }
        String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS);
        if (StringUtils.isNotEmpty(addressFromJVM)) {
            log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM);
            return HOST_ADDRESS = addressFromJVM;
        }
        InetAddress address = getLocalAddress();
        if (address != null) {
            return HOST_ADDRESS = address.getHostAddress();
        }
        return LOCALHOST_VALUE;
    }
    public static InetAddress getLocalAddress() {
        if (LOCAL_ADDRESS != null) {
            return LOCAL_ADDRESS;
        }
        InetAddress localAddress = getLocalAddress0();
        LOCAL_ADDRESS = localAddress;
        return localAddress;
    }    
    private static InetAddress getLocalAddress0() {
        // @since 2.7.6, choose the {@link NetworkInterface} first
        try {
            InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
            if (addressOp != null) {
                return addressOp;
            }
        } catch (Throwable e) {
            log.warn("[Net] getLocalAddress0 failed.", e);
        }
        InetAddress localAddress = null;
        try {
            localAddress = InetAddress.getLocalHost();
            Optional<InetAddress> addressOp = toValidAddress(localAddress);
            if (addressOp.isPresent()) {
                return addressOp.get();
            }
        } catch (Throwable e) {
            log.warn("[Net] getLocalAddress0 failed.", e);
        }
        return localAddress;
    }
NetUtils的getLocalHost先判断HOST_ADDRESS是否有值,有则直接返回,否则先从系统属性读取powerjob.network.local.address,读取不到则取LOCAL_ADDRESS,若LOCAL_ADDRESS为null则通过getLocalAddress0获取

OmsLockDO

tech/powerjob/server/persistence/remote/model/OmsLockDO.java

@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})})
public class OmsLockDO {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
    @GenericGenerator(name = "native", strategy = "native")
    private Long id;

    private String lockName;

    private String ownerIP;
    /**
     * 最长持有锁的时间
     */
    private Long maxLockTime;

    private Date gmtCreate;

    private Date gmtModified;

    public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
        this.lockName = lockName;
        this.ownerIP = ownerIP;
        this.maxLockTime = maxLockTime;
        this.gmtCreate = new Date();
        this.gmtModified = this.gmtCreate;
    }
}
OmsLockDO定义lockName为唯一索引,它还定义了ownerIP、maxLockTime

OmsLockRepository

tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java

public interface OmsLockRepository extends JpaRepository&lt;OmsLockDO, Long&gt; {

    @Modifying
    @Transactional(rollbackOn = Exception.class)
    @Query(value = "delete from OmsLockDO where lockName = ?1")
    int deleteByLockName(String lockName);

    OmsLockDO findByLockName(String lockName);

    @Modifying
    @Transactional(rollbackOn = Exception.class)
    int deleteByOwnerIP(String ip);
}
OmsLockRepository继承了JpaRepository,它定义了deleteByLockName、findByLockName、deleteByOwnerIP方法

小结

LockService接口定义了tryLock、unlock方法;DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName。

以上就是PowerJob LockService方法工作流程源码解读的详细内容,更多关于PowerJob LockService的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文