Spring AOP实现断路器方式
作者:小马不敲代码
环境:Spring5.3.23
1. 概述
Spring Cloud体系中,断路器有Hystrix,Resilience4j,Sentinel等组件,它们的核心功能是当某个服务不可用时,断路器会屏蔽相关故障,返回一个用户预设的fallback。
具体来说,断路器有以下一些作用:
- 阻止故障的向上传递:对服务的健康状况进行监控和防护。
- 对故障快速失败并积极回复:回退并优雅降级。
- 提供三种容错方式来帮助达成目标:资源隔离,熔断和降级。
总的来说,断路器的核心作用是增强分布式系统的弹性,避免级联故障,以提高系统的整体可用性。
2. 实现方案
我们将通过使用AOP和自定义注解,实现断路器功能。根据自己的需要在关键的方法上添加注解,然后在运行时通过AOP拦截这些注解,并执行相应的断路器逻辑。
断路器的主要作用是防止故障的扩散,并保护系统的稳定性。当某个服务出现故障时,断路器可以快速中断与该服务的连接,并返回一个预设的fallback响应,从而避免故障对整个系统的影响。
通过自定义注解和AOP的结合,我们可以实现以下功能:
在需要的接口上添加自定义注解,注解中可以包含与断路器相关的配置信息,如:错误次数,时间窗口等。
通过AOP拦截这些注解,并在运行时动态地创建断路器。
当服务调用时,断路器会根据配置的逻辑判断是否需要中断连接或返回fallback响应。
如果服务正常,断路器将不会进行任何操作;如果服务故障,断路器将根据预设的逻辑进行处理。
3. 代码实现
3.1 自定义注解
AOP只会拦截该注解的方法或类。
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface PackFuse {
/**降级方法*/
String fallback() default "" ;
/**失败次数*/
int fails() default 5 ;
/**窗口时间:s*/
int windowSize() default 10 ;
}3.2 自定义断路器的状态
断路器的状态有以下几种:
- Closed(关闭状态):默认情况下,断路器处于关闭状态,允许远程服务调用正常进行。
- Open(打开状态):当远程服务调用失败次数达到预设的阈值时,断路器会自动打开,中断与该服务的所有调用,并返回fallback响应。
- Half-Open(半开状态):在一段时间后,断路器会自动从Open状态转换到Half-Open状态。在Half-Open状态下,断路器会尝试少量请求以测试服务是否已恢复。如果测试请求成功,断路器将自动关闭并恢复到Closed状态;否则,将保持Half-Open状态,如果超过指定的错误次数,则再次转变为Open状态。
状态定义
public enum EnumState {
CLOSE, HALF_OPEN, OPEN ;
}每个断路器都会自己的状态
public class PackFuseState {
/**当前状态*/
private EnumState state = EnumState.CLOSE ;
/**失败次数*/
private AtomicInteger failCount = new AtomicInteger(0) ;
/**最大失败次数*/
private int maxFailCount = 5 ;
/**窗口大小;默认每10秒重置*/
private int windowTime = 10 ;
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) ;
private Object lock = new Object() ;
public PackFuseState(int maxFailCount, int windowTime) {
this.maxFailCount = maxFailCount ;
this.windowTime = windowTime ;
executor.execute(() -> {
while(true) {
if (state == EnumState.CLOSE) {
try {
TimeUnit.SECONDS.sleep(windowTime) ;
if (state == EnumState.CLOSE) {
failCount.set(0) ;
}
} catch (InterruptedException e) {
e.printStackTrace() ;
}
} else {
synchronized (lock) {
try {
lock.wait() ;
} catch (InterruptedException e) {
e.printStackTrace() ;
}
}
}
}
}) ;
}
public EnumState getState() {
return state;
}
public void setState(EnumState state) {
this.state = state;
}
public AtomicInteger getFailCount() {
return failCount;
}
public void setFailCount(AtomicInteger failCount) {
this.failCount = failCount;
}
public int getwindowTime() {
return windowTime;
}
public void setwindowTime(int windowTime) {
this.windowTime = windowTime;
}
public PackFuseState addFailCount() {
int count = this.failCount.incrementAndGet() ;
if (count >= maxFailCount) {
this.setState(EnumState.OPEN) ;
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(windowTime) ;
setState(EnumState.HALF_OPEN) ;
failCount.set(0) ;
} catch (InterruptedException e) {
e.printStackTrace() ;
}
}) ;
}
return this ;
}
public PackFuseState closeState() {
this.setState(EnumState.CLOSE) ;
this.failCount.set(0) ;
return this ;
}
public Object getLock() {
return lock;
}
}3.3 切面定义
该切面拦截所有标有@PackFuse注解的方法
@Aspect
@Component
public class PackFuseAspect {
private static final Map<String, PackFuseState> META_HOLDER_MAP = new ConcurrentHashMap<>() ;
private static final Map<String, Object> FALLBACK = new ConcurrentHashMap<>() ;
private static final String DEFAULT_RET_DATA = "服务不可用" ;
@Pointcut("@annotation(fuse)")
private void fuse(PackFuse fuse) {}
@Around("fuse(fuse)")
public Object packFuse(ProceedingJoinPoint pjp, PackFuse fuse) {
MethodSignature joinPointObject = (MethodSignature) pjp.getSignature() ;
Class<?> targetType = joinPointObject.getDeclaringType() ;
Method method = joinPointObject.getMethod() ;
String targetKey = getKey(targetType, method);
String fallback = fuse.fallback() ;
if (!FALLBACK.containsKey(targetKey)) {
if (StringUtils.hasLength(fallback)) {
try {
Method fallbackMethod = targetType.getDeclaredMethod(fallback) ;
FALLBACK.put(targetKey, fallbackMethod.invoke(pjp.getTarget())) ;
} catch (Exception e) {
e.printStackTrace() ;
}
} else {
FALLBACK.put(targetKey, DEFAULT_RET_DATA) ;
}
}
int fails = fuse.fails() ;
int windowSize = fuse.windowSize() ;
PackFuseState fuseState = null ;
try {
fuseState = META_HOLDER_MAP.computeIfAbsent(targetKey, key -> new PackFuseState(fails, windowSize)) ;
switch (fuseState.getState()) {
case CLOSE:
return pjp.proceed() ;
case HALF_OPEN:
Random rd = new Random() ;
int c = rd.nextInt(fails) ;
if (c >= (fails / 2)) {
Object ret = pjp.proceed() ;
fuseState.closeState() ;
synchronized (fuseState.getLock()) {
fuseState.getLock().notifyAll() ;
}
return ret ;
}
return FALLBACK.get(targetKey) ;
case OPEN:
return FALLBACK.get(targetKey) ;
}
} catch (Throwable e) {
fuseState.addFailCount() ;
}
return FALLBACK.get(targetKey) ;
}
private String getKey(Class<?> targetType, Method method) {
StringBuilder builder = new StringBuilder();
builder.append(targetType.getSimpleName());
builder.append('#').append(method.getName()).append('(');
if (method.getParameterTypes().length > 0) {
builder.deleteCharAt(builder.length() - 1);
}
return builder.append(')').toString().replaceAll("[^a-zA-Z0-9]", "") ;
}
}以上就实现了一个简单的断路器功能。
通过使用AOP+自定义注解的方式成功地实现了断路器功能。这种方法给予了我们很大的灵活性和扩展性,可以轻松地对特定的服务进行故障隔离,避免故障扩散,保护整个系统的稳定性。同时,通过自定义注解,我们能够清晰地定义断路器的配置和逻辑,使代码更易于阅读和维护。
这里只是一个非常简单的小例子给大家一个实现的思路,大家可以根据自己的想法或者结合Hystrix的实现来丰富功能。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
