PowerJob的AliOssService工作流程源码解读
作者:codecraft
这篇文章主要介绍了PowerJob的AliOssServiceg工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
序
本文主要研究一下PowerJob的AliOssService
DFsService
tech/powerjob/server/extension/dfs/DFsService.java
public interface DFsService { /** * 存储文件 * @param storeRequest 存储请求 * @throws IOException 异常 */ void store(StoreRequest storeRequest) throws IOException; /** * 下载文件 * @param downloadRequest 文件下载请求 * @throws IOException 异常 */ void download(DownloadRequest downloadRequest) throws IOException; /** * 获取文件元信息 * @param fileLocation 文件位置 * @return 存在则返回文件元信息 * @throws IOException 异常 */ Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException; /** * 清理 powerjob 认为“过期”的文件 * 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法) * @param bucket bucket * @param days 天数,需要清理超过 X 天的文件 */ default void cleanExpiredFiles(String bucket, int days) { } }
DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法
AbstractDFsService
tech/powerjob/server/persistence/storage/AbstractDFsService.java
@Slf4j public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean { protected ApplicationContext applicationContext; public AbstractDFsService() { log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName()); } abstract protected void init(ApplicationContext applicationContext); protected static final String PROPERTY_KEY = "oms.storage.dfs"; protected static String fetchProperty(Environment environment, String dfsType, String key) { String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key); return environment.getProperty(pKey); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName()); init(applicationContext); } }
AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init
AliOssService
tech/powerjob/server/persistence/storage/impl/AliOssService.java
@Slf4j @Priority(value = Integer.MAX_VALUE - 1) @Conditional(AliOssService.AliOssCondition.class) public class AliOssService extends AbstractDFsService { private static final String TYPE_ALI_OSS = "alioss"; private static final String KEY_ENDPOINT = "endpoint"; private static final String KEY_BUCKET = "bucket"; private static final String KEY_CREDENTIAL_TYPE = "credential_type"; private static final String KEY_AK = "ak"; private static final String KEY_SK = "sk"; private static final String KEY_TOKEN = "token"; private OSS oss; private String bucket; private static final int DOWNLOAD_PART_SIZE = 10240; private static final String NO_SUCH_KEY = "NoSuchKey"; //...... void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception { log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token); if (StringUtils.isEmpty(bucket)) { throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob"); } this.bucket = bucket; CredentialsProvider credentialsProvider; CredentialType credentialType = CredentialType.parse(mode); switch (credentialType) { case PWD: credentialsProvider = new DefaultCredentialProvider(ak, sk, token); break; case SYSTEM_PROPERTY: credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider(); break; default: credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider(); } this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider); log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER."); } //...... }
AliOssService继承了AbstractDFsService
store
@Override public void store(StoreRequest storeRequest) throws IOException { ObjectMetadata objectMetadata = new ObjectMetadata(); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata); oss.putObject(putObjectRequest); }
store方法创建PutObjectRequest,使用oss.putObject进行上传
download
@Override public void download(DownloadRequest downloadRequest) throws IOException { FileLocation dfl = downloadRequest.getFileLocation(); DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE); try { FileUtils.forceMkdirParent(downloadRequest.getTarget()); oss.downloadFile(downloadFileRequest); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }
download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)进行下载
fetchFileMeta
@Override public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException { try { ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation)); return Optional.ofNullable(objectMetadata).map(ossM -> { Map<String, Object> metaInfo = Maps.newHashMap(); metaInfo.putAll(ossM.getRawMetadata()); if (ossM.getUserMetadata() != null) { metaInfo.putAll(ossM.getUserMetadata()); } return new FileMeta() .setLastModifiedTime(ossM.getLastModified()) .setLength(ossM.getContentLength()) .setMetaInfo(metaInfo); }); } catch (OSSException oe) { String errorCode = oe.getErrorCode(); if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) { return Optional.empty(); } ExceptionUtils.rethrow(oe); } return Optional.empty(); }
fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata
cleanExpiredFiles
@Override public void cleanExpiredFiles(String bucket, int days) { /* 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 */ }
cleanExpiredFiles则是空操作
init
protected void init(ApplicationContext applicationContext) { Environment environment = applicationContext.getEnvironment(); String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT); String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET); String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE); String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK); String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK); String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN); try { initOssClient(endpoint, bkt, ct, ak, sk, token); } catch (Exception e) { ExceptionUtils.rethrow(e); } }
init则是通过environment获取相关属性,然后执行initOssClient
小结
DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。
以上就是PowerJob的AliOssServiceg工作流程源码解读的详细内容,更多关于PowerJob AliOssServiceg的资料请关注脚本之家其它相关文章!