java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring cloud 非阻塞式微服务

搭建Spring cloud 非阻塞式微服务架构的实现

作者:ciku

本文介绍搭建非阻塞式微服务架构,采用Spring Gateway、Spring Security、JWT与Redis,集成EurekaServer、Auth-service、User-service等服务,实现用户认证、日志跟踪及统一入口管理,感兴趣的可以了解一下

搭建一个非阻塞式微服务架构

本文简述如何搭建一个非阻塞式的微服务架构,使用Spring gateway进行路由配置、Spring Security 进行用户登录、生成JWT 令牌、token校验,使用Redis存储token信息,使用mysql数据库存储用户信息;使用全局traceId搭配gateway、slf4j实现全局日志跟踪。主要有四个子服务组成eurekaServer、gateway、auth-service、user-service。eurekaServer负责服务注册与发现、gateway负责统一的入口配置、auth-service 负责登录、认证;user-service用户管理。

版本介绍:

父工程配置

pom.xml
引入基本的webflux、devtools、数据库连接工具、ribbon与OpenFeign依赖、spring colud管理包、定义子服务等。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>TraceLogDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>TraceLogDemo</name>
    <description>TraceLogDemo</description>
    <packaging>pom</packaging> <!-- 不打包成jar-->
    <modules><!-- 子服务-->
        <module>eurekaServer</module>
        <module>gateway</module>
        <module>user-service</module>
        <module>auth-service</module>
    </modules>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2025.0.0</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

搭建eurekaServer

1.pom.xml
pom中修改parent为父工程的groupId、artifactid等信息,引入spring-cloud-starter-netflix-eureka-server依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>com.example</groupId>
		<artifactId>TraceLogDemo</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<artifactId>eurekaServer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>eurekaServer</name>
	<description>eurekaServer</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<!--eureka Server依赖-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

2.修改配置文件

spring:
  application:
   name: eurekaServer
server:
  port: 8761
eureka:
  client:
    register-with-eureka: false #是否注册为客户端
    fetch-registry: false 

3.修改启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(EurekaServerApplication.class, args);
	}

}

启动后访问locahost:8761,显示如下界面:

搭建Auth-Service认证服务

1.pom.xml
核心引入Spring security和jwt依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>com.example</groupId>
		<artifactId>TraceLogDemo</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<artifactId>auth-service</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>auth-service</name>
	<description>auth-service</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
		</dependency>
		<!--spring security依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-web</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.springframework.cloud</groupId>
					<artifactId>spring-cloud-starter-netflix-zuul</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>jakarta.servlet</groupId>
			<artifactId>jakarta.servlet-api</artifactId>
			<version>6.0.0</version>
			<scope>provided</scope>
		</dependency>
		<!--引入jwt依赖-->
		<dependency>
			<groupId>io.jsonwebtoken</groupId>
			<artifactId>jjwt-api</artifactId>
			<version>0.11.5</version>
		</dependency>
		<dependency>
			<groupId>io.jsonwebtoken</groupId>
			<artifactId>jjwt-impl</artifactId>
			<version>0.11.5</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>io.jsonwebtoken</groupId>
			<artifactId>jjwt-jackson</artifactId>
			<version>0.11.5</version>
			<scope>runtime</scope>
		</dependency>
		<!--mybatis plus依赖-->
		<dependency>
			<groupId>com.baomidou</groupId>
			<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
			<version>3.5.10</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<!--redis依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

2.修改配置文件
数据库配置、redis配置、mybatis plus配置、eureka注册

spring:
  application:
    name: auth-service
  datasource:
    url: jdbc:mysql://192.168.48.164:3306/db01?severTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  main:
    allow-bean-definition-overriding: true
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
  data:
    redis:
      host: 192.168.48.164
      port: 6379
      password: 123456
      database: 10
server:
  port: 8082
mybatis-plus:
  mapper-locations: classpath*:mapper/**/*.xml
  type-aliases-package: com.example.entity
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/

3.业务代码
由于此处代码很多省略基础查询代码以及redis相关代码。只给核心代码
SecurityConfig.java:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.config.annotation.web.configuration.WebSecurityCustomizer;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;
import reactor.core.publisher.Mono;

/**
 * @ClassName:SecurityConfig
 * @Author: xuli
 * @Date: 2025/8/26 18:22
 * @Description: 必须描述类做什么事情, 实现什么功能
 */
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
    @Bean
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
        return http
                .csrf(ServerHttpSecurity.CsrfSpec::disable)//禁用csrf
                .formLogin(ServerHttpSecurity.FormLoginSpec::disable)//禁用表单登录
                .httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)//禁用基础登录
                .authorizeExchange(exchanges -> exchanges
                        .pathMatchers("/auth/**","/user/**").permitAll()//允许/auth/**所有的
                        .anyExchange().authenticated()//其他都要授权
                )
                .exceptionHandling(handling -> handling
                        .authenticationEntryPoint((exchange, ex) ->
                                Mono.fromRunnable(() -> {//设置异常状态
                                    exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
                                }))
                        .accessDeniedHandler((exchange, denied) ->
                                Mono.fromRunnable(() -> {
                                    exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
                                }))
                )
                .build();
    }
    /**
     * 响应式认证管理器(核心组件)
     * @param userDetailsService 实现ReactiveUserDetailsService接口的用户服务
     * @param passwordEncoder 已配置的密码编码器
     * @return 认证处理链,完成凭证验证和权限注入
     */
    @Bean
    public ReactiveAuthenticationManager AuthenticationManager(
            ReactiveUserDetailsService userDetailsService,
            PasswordEncoder passwordEncoder) {

        return authentication -> {//调用userDetailsService实现校验
            UsernamePasswordAuthenticationToken authToken =
                    (UsernamePasswordAuthenticationToken) authentication;
            return userDetailsService.findByUsername(authToken.getName())
                    .filter(user -> passwordEncoder.matches(
                            authToken.getCredentials().toString(),
                            user.getPassword()
                    ))
                    .switchIfEmpty(Mono.error(new BadCredentialsException("Invalid credentials")))
                    .map(user -> new UsernamePasswordAuthenticationToken(
                            user.getUsername(),
                            user.getPassword(),
                            user.getAuthorities()
                    ));
        };
    }
    @Bean
    public WebSecurityCustomizer webSecurityCustomizer() {
        return web -> web.debug(true);
    }
    /**
     * BCrypt密码编码器(安全必须)
     * @return 强度10的BCrypt实现
     */
    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }
    @Bean
    public TraceWebFilter TraceWebFilter() {
        return new TraceWebFilter();
    }
    @Bean
    public JwtAuthenticationFilter jwtAuthenticationFilter() {
        return new JwtAuthenticationFilter();
    }


}

全局日志拦截过滤器:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

/**
 * @ClassName:TraceWebFilter
 * @Author: ciku
 * @Date: 2025/8/28 15:54
 * @Description: 全局traceFilter,用于从request中获取traceId设置在MDC中
 */
@Component
public class TraceWebFilter implements WebFilter {
    private static final String TRACE_ID_HEADER = "X-Trace-Id";
    private Logger logger= LoggerFactory.getLogger(TraceWebFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return Mono.defer(()->{
            String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);//获取header里面的traceId
            if(!traceId.isBlank()){
                MDC.put("traceId",traceId);
            }
            return chain.filter(exchange);

        });
    }
}

JwtUtil:

package com.example.authservice.util;

import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.security.Keys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Date;
/**
 * @ClassName:JwtUtil
 * @Author: xuli
 * @Date: 2025/8/26 21:48
 * @Description: 必须描述类做什么事情, 实现什么功能
 **/
@Component
public class JwtUtil {
    private static final String SECRET_KEY = "u/4W2VX5J6QbLdE9NfG3BcH7YzA1kDmP8sR0tIw=";
    private static final long EXPIRATION_TIME = 86400000; // 24小时
    private static Logger logger= LoggerFactory.getLogger(JwtUtil.class);

    /**
     * 根据用户名生成token
     * @param username
     * @return
     */
    public static String generateToken(String username) {
        String token = Jwts.builder()
                .setSubject(username)
                .setIssuedAt(new Date())
                .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .signWith(Keys.hmacShaKeyFor(SECRET_KEY.getBytes()))
                .compact();
        logger.info("生成token成功!token:"+token);
        return token;
    }

}

获取用户对象,实现ReactiveUserDetailsService 接口是Spring security提供的SPI接口。

import com.example.authservice.user.entity.User;
import com.example.authservice.user.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.ArrayList;

/**
 * @ClassName:UserDetailsServiceImpl
 * @Author: xuli
 * @Date: 2025/8/26 22:20
 * @Description: 实现ReactiveUserDetailsService的方法,进行用户名密码校验
 */

@Service
public class UserDetailsServiceImpl implements ReactiveUserDetailsService {
    @Autowired
    private UserService userService;

    @Override
    public Mono<UserDetails> findByUsername(String username) {
        return Mono.just(loadUserByUsername(username));
    }
    private UserDetails loadUserByUsername(String username) {
        User user = userService.findUserByUsername(username);//查询数据库的用户信息
        if(user==null){
            throw new UsernameNotFoundException("user not found");
        }
        return new org.springframework.security.core.userdetails.User(user.getUserName(),user.getPassword(),
                new ArrayList<>()
        );
    }
}

所有业务自服务需要增加 logback-spring.xml,后续不再重复:

<configuration>
    <property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] [%X{traceId}] %-5level %logger{36} - %msg%n"/>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

搭建Gateway网关服务

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>TraceLogDemo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>gateway</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>gateway</name>
    <description>gateway</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <!--webflux对应的gateway依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway-server-webflux</artifactId>
        </dependency>
        <!--基于netflix的eureka client依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.配置文件
注册到 http://localhost:8761 eureka服务端

spring:
  application:
   name: gateway
server:
  port: 8080
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
    fetch-registry: true
    register-with-eureka: true

3.启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient  //标记为客户端
public class GatewayApplication {

    public static void main(String[] args) {
        SpringApplication.run(GatewayApplication.class, args);
    }

}

4.配置路由信息

import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 * @ClassName:BeanConfig
 * @Author: xuli
 * @Date: 2025/8/26 16:55
 * @Description: 配置网关路由
 */
@Configuration
public class RuteConfiguration {
    @Bean
    public RouteLocator myRoutes(RouteLocatorBuilder builder){
        return builder.routes()
                .route("user-service",p->p.path("/userService/**")//设置路由,以userService开头的都调用user-service服务
                        .filters(f->f.stripPrefix(1))
                        .uri("lb://user-service"))
                .route("auth-service",p->p.path("/authService/**")//设置路由,以authService开头的都调用auth-service服务
                        .filters(f->f.stripPrefix(1))
                        .uri("lb://auth-service")//默认规则lb://服务名,eureka大小写不限,nacos小大写区分
                )
                .build();
    }

}

5.定义jwt校验返回对象

package com.example.gateway.config;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;

/**
 * @ClassName:JwtResponse
 * @Author: xuli
 * @Date: 2025/8/26 22:56
 * @Description: jwt校验返回对象
 */
@Data
@JsonInclude
public class JwtResponse {
    private String token;
    private boolean isExpiration;
    private String error;

    // 必须有无参构造函数
    public JwtResponse() {}

    public JwtResponse(String token, String error) {
        this.token = token;
        this.error=error;
    }

    public JwtResponse(String token, boolean isExpiration, String error) {
        this.token = token;
        this.isExpiration = isExpiration;
        this.error = error;
    }

    public String getToken() {
        return token;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public boolean isExpiration() {
        return isExpiration;
    }

    public void setExpiration(boolean expiration) {
        isExpiration = expiration;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }
}

6.配置过滤器
用于请求拦截,获取jwt 令牌,再调用auth-service校验,并设置全局traceId

package com.example.gateway.filter;

import com.example.gateway.config.JwtResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

/**
 * @ClassName:JwtAuthFilter
 * @Author: xuli
 * @Date: 2025/8/27 23:50
 * @Description: JWT过滤器,接口调用是校验token并设置traceId
 */
@Component
public class JwtAuthFilter implements GlobalFilter {
    private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory;
    private static final Logger logger = LoggerFactory.getLogger(JwtAuthFilter.class);
    private final static String TRACE_HEADER="X-Trace-Id";

    private final WebClient webClient;
    public JwtAuthFilter(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory
            ,WebClient.Builder webClientBuilder,LoadBalancerClient loadBalancerClient){
        this.loadBalancerFactory = loadBalancerFactory;
        this.webClient = webClientBuilder
                .clientConnector(new ReactorClientHttpConnector(
                        HttpClient.create()
                                .responseTimeout(Duration.ofSeconds(10))
                                .keepAlive(true)
                )).build();
    }
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    ReactiveLoadBalancer<ServiceInstance> loadBalancer =  loadBalancerFactory.getInstance("auth-service");//获取实际地址
    ServerHttpRequest request = exchange.getRequest();
    String path = exchange.getRequest().getURI().getPath();
    if("/auth/login".equals(path)){//如果是登录接口直接放行
        //增加全局traceId
        String  traceId= UUID.randomUUID().toString();
        MDC.put("traceId",traceId);
        //traceId设置在请求头中
        ServerHttpRequest mutatedRequest = exchange.getRequest()
                .mutate()
                .header("X-Trace-Id",traceId)
                .build();
        return chain.filter(exchange.mutate().request(mutatedRequest).build());
    }
    String loginName = extractLoginName(request);
    String token = extractToken(request);
    return Flux.from(loadBalancer.choose())
            .take(1)
            .single()
            .flatMap(response->{
                ServiceInstance instance = response.getServer();//获取返回的实例
                if(instance!=null){
                    logger.info("获取到实例地址"+instance.getUri());
                }
                //生成traceId
                String  traceId= UUID.randomUUID().toString();
                MDC.put("traceId",traceId);

                //调用校验token接口
                return webClient.get()
                        .uri( instance.getUri()+"/auth/validateToken")
                        .header(HttpHeaders.AUTHORIZATION,token)
                        .header("X-User-Id",loginName)
                        .header("Content-Type","application/json")
                        .header("X-Trace-Id",traceId)
                        .retrieve()
                        .bodyToMono(JwtResponse.class)//设置返回类型,与接口返回类型一直
                        .flatMap(jwtResponse -> {
                            if(!jwtResponse.isExpiration()){//获取返回结果
                                logger.info("接口校验失败,返回"+jwtResponse.isExpiration()+"请确认token是否过期");
                            }
                            if (jwtResponse.isExpiration()) {//校验通过,调用请求的接口
                                ServerHttpRequest mutatedRequest = exchange.getRequest()
                                        .mutate()
                                        .header("X-User-Id", loginName)
                                        .header("Authorization")
                                        .header("X-Trace-Id",traceId)
                                        .build();
                                logger.info(instance.getUri()+"/auth/validateToken"+"校验成功");
                                return chain.filter(exchange.mutate().request(mutatedRequest).build());
                            }
                            return Mono.error(new ResponseStatusException(HttpStatus.UNAUTHORIZED));//失败返回401
                        });
            })
            .onErrorResume(e -> {//异常处理
                 logger.error(e.getMessage());
                exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
                return exchange.getResponse().setComplete();
            });

}

    /**
     * 未授权设置401
     * @param exchange
     * @param error
     * @return
     */
    private Mono<Void> unauthorized(ServerWebExchange exchange, String error) {
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
        return exchange.getResponse().setComplete();
    }

    /**
     * 获取token
     * @param request
     * @return
     */
    private String extractToken(ServerHttpRequest request) {
        HttpHeaders headers = request.getHeaders();
        List<String> list = headers.get("Authorization");
        if(!CollectionUtils.isEmpty(list)){
            String string = list.get(0);
            return !string.isBlank()?string.replaceAll("Bearer ", ""):"";
        }
        return "";
    }

    /**
     * 获取登录名
     * @param request
     * @return
     */
    private String extractLoginName(ServerHttpRequest request) {
        HttpHeaders headers = request.getHeaders();
        List<String> list = headers.get("X-User-Id");
        if(!CollectionUtils.isEmpty(list)){
            String string = list.get(0);
            return !string.isBlank()?string:"";
        }
        return "";
    }

}

搭建业务服务 UserSevice

userService简单实现,仅供演示
1.pom.xml
只需引入eureka-client

 <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

2.全局trace过滤器

@Component
public class TraceWebFilter implements WebFilter {
    private static final String TRACE_ID_HEADER = "X-Trace-Id";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return Mono.defer(()->{

            String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
            return chain.filter(exchange)
                    .contextWrite(ctx->ctx.put("traceId", traceId))
                    .doOnEach(sinal->{
                        if(sinal.isOnComplete()){
                            MDC.put("traceId",traceId);//设置在slf4j的日志对象中
                        }
                    });

        });
    }
}

3.controller简单实现

@RestController
@RequestMapping("/user")
public class TestController {
    private Logger logger= LoggerFactory.getLogger(TestController.class);

    @GetMapping("/sayHello")
    public String sayHello(){
        //todo
        logger.info("调用user-service/user/sayHello接口成功!");
        return "welcome User service";
    }
}

配置文件就不再赘述,只需要注册到eurake中即可。

运行结果

1.未登录调用userService

2.全局日志跟踪

3.用户登录成功

采用统一入口调用登录方法,成功返回token,并记录在redis中

4.携带最新token访问user-Service

5.失效token或错误token

到此这篇关于搭建Spring cloud 非阻塞式微服务架构的实现的文章就介绍到这了,更多相关Spring cloud 非阻塞式微服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文