java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring AOP实现断路器

Spring AOP实现断路器方式

作者:小马不敲代码

这篇文章主要介绍了Spring AOP实现断路器方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

环境:Spring5.3.23

1. 概述

Spring Cloud体系中,断路器有Hystrix,Resilience4j,Sentinel等组件,它们的核心功能是当某个服务不可用时,断路器会屏蔽相关故障,返回一个用户预设的fallback。

具体来说,断路器有以下一些作用:

总的来说,断路器的核心作用是增强分布式系统的弹性,避免级联故障,以提高系统的整体可用性。

2. 实现方案

我们将通过使用AOP和自定义注解,实现断路器功能。根据自己的需要在关键的方法上添加注解,然后在运行时通过AOP拦截这些注解,并执行相应的断路器逻辑。

断路器的主要作用是防止故障的扩散,并保护系统的稳定性。当某个服务出现故障时,断路器可以快速中断与该服务的连接,并返回一个预设的fallback响应,从而避免故障对整个系统的影响。

通过自定义注解和AOP的结合,我们可以实现以下功能:

在需要的接口上添加自定义注解,注解中可以包含与断路器相关的配置信息,如:错误次数,时间窗口等。

通过AOP拦截这些注解,并在运行时动态地创建断路器。

当服务调用时,断路器会根据配置的逻辑判断是否需要中断连接或返回fallback响应。

如果服务正常,断路器将不会进行任何操作;如果服务故障,断路器将根据预设的逻辑进行处理。

3. 代码实现

3.1 自定义注解

AOP只会拦截该注解的方法或类。

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface PackFuse {

  /**降级方法*/
  String fallback() default "" ;

  /**失败次数*/
  int fails() default 5 ;

  /**窗口时间:s*/
  int windowSize() default 10 ;

}

3.2 自定义断路器的状态

断路器的状态有以下几种:

状态定义

public enum EnumState {

  CLOSE, HALF_OPEN, OPEN ;
  
}

每个断路器都会自己的状态

public class PackFuseState {

  /**当前状态*/
  private EnumState state = EnumState.CLOSE ;
  /**失败次数*/
  private AtomicInteger failCount = new AtomicInteger(0) ;
  /**最大失败次数*/
  private int maxFailCount = 5 ;
  /**窗口大小;默认每10秒重置*/
  private int windowTime = 10 ;
  
  private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) ;
  
  private Object lock = new Object() ;
  
  public PackFuseState(int maxFailCount, int windowTime) {
    this.maxFailCount = maxFailCount ;
    this.windowTime = windowTime ;
    executor.execute(() -> {
      while(true) {
        if (state == EnumState.CLOSE) {
          try {
            TimeUnit.SECONDS.sleep(windowTime) ;
            if (state == EnumState.CLOSE) {
              failCount.set(0) ;
            }
          } catch (InterruptedException e) {
            e.printStackTrace() ;
          }
        } else {
          synchronized (lock) {
            try {
              lock.wait() ;
            } catch (InterruptedException e) {
              e.printStackTrace() ;
            }
          }
        }
      }
    }) ;
  }
  
  public EnumState getState() {
    return state;
  }
  public void setState(EnumState state) {
    this.state = state;
  }
  public AtomicInteger getFailCount() {
    return failCount;
  }
  public void setFailCount(AtomicInteger failCount) {
    this.failCount = failCount;
  }
  public int getwindowTime() {
    return windowTime;
  }
  public void setwindowTime(int windowTime) {
    this.windowTime = windowTime;
  }
  
  public PackFuseState addFailCount() {
    int count = this.failCount.incrementAndGet() ;
    if (count >= maxFailCount) {
      this.setState(EnumState.OPEN) ;
      executor.execute(() -> {
        try {
          TimeUnit.SECONDS.sleep(windowTime) ;
          setState(EnumState.HALF_OPEN) ;
          failCount.set(0) ;
        } catch (InterruptedException e) {
          e.printStackTrace() ;
        }
      }) ;
    }
    return this ;
  }
  
  public PackFuseState closeState() {
    this.setState(EnumState.CLOSE) ;
    this.failCount.set(0) ;
    return this ;
  }

  public Object getLock() {
    return lock;
  }

}

3.3 切面定义

该切面拦截所有标有@PackFuse注解的方法

@Aspect
@Component
public class PackFuseAspect {

  private static final Map<String, PackFuseState> META_HOLDER_MAP = new ConcurrentHashMap<>() ;
  private static final Map<String, Object> FALLBACK = new ConcurrentHashMap<>() ;
  private static final String DEFAULT_RET_DATA = "服务不可用" ;
  
  @Pointcut("@annotation(fuse)")
  private void fuse(PackFuse fuse) {}
  
  @Around("fuse(fuse)")
  public Object packFuse(ProceedingJoinPoint pjp, PackFuse fuse) {
    MethodSignature joinPointObject = (MethodSignature) pjp.getSignature() ;
    Class<?> targetType = joinPointObject.getDeclaringType() ;
    Method method = joinPointObject.getMethod() ;
    String targetKey = getKey(targetType, method);
    
    String fallback = fuse.fallback() ;
    if (!FALLBACK.containsKey(targetKey)) {
      if (StringUtils.hasLength(fallback)) {
        try {
          Method fallbackMethod = targetType.getDeclaredMethod(fallback) ;
          FALLBACK.put(targetKey, fallbackMethod.invoke(pjp.getTarget())) ;
        } catch (Exception e) {
          e.printStackTrace() ;
        }
      } else {
        FALLBACK.put(targetKey, DEFAULT_RET_DATA) ;
      }
    }
    int fails = fuse.fails() ;
    int windowSize = fuse.windowSize() ;
    
    PackFuseState fuseState = null ;
    try {
      fuseState = META_HOLDER_MAP.computeIfAbsent(targetKey, key -> new PackFuseState(fails, windowSize)) ;
      switch (fuseState.getState()) {
        case CLOSE:
          return pjp.proceed() ;
        case HALF_OPEN:
          Random rd = new Random() ;
          int c = rd.nextInt(fails) ;
          if (c >= (fails / 2)) {
            Object ret = pjp.proceed() ;
            fuseState.closeState() ;
            synchronized (fuseState.getLock()) {
              fuseState.getLock().notifyAll() ; 
            }
            return ret ;
          }
          return FALLBACK.get(targetKey) ;
        case OPEN:
          return FALLBACK.get(targetKey) ;
      }
    } catch (Throwable e) {
      fuseState.addFailCount() ;
    }
    return FALLBACK.get(targetKey) ;
  }
  
  private String getKey(Class<?> targetType, Method method) {
    StringBuilder builder = new StringBuilder();
    builder.append(targetType.getSimpleName());
    builder.append('#').append(method.getName()).append('(');
    if (method.getParameterTypes().length > 0) {
      builder.deleteCharAt(builder.length() - 1);
    }
    return builder.append(')').toString().replaceAll("[^a-zA-Z0-9]", "") ;
  }
  
}

以上就实现了一个简单的断路器功能。

通过使用AOP+自定义注解的方式成功地实现了断路器功能。这种方法给予了我们很大的灵活性和扩展性,可以轻松地对特定的服务进行故障隔离,避免故障扩散,保护整个系统的稳定性。同时,通过自定义注解,我们能够清晰地定义断路器的配置和逻辑,使代码更易于阅读和维护。

这里只是一个非常简单的小例子给大家一个实现的思路,大家可以根据自己的想法或者结合Hystrix的实现来丰富功能。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文