利用SpringBoot实现一个基于本地代理模式的RPC调用框架
作者:风象南
虽然Dubbo、gRPC等成熟框架已经为我们提供了完整的RPC解决方案,但理解其底层原理并动手实现一个简化版本,对提升我们的技术理解深度很有帮助,下面我们就来看看如何使用SpringBoot实现一个基于本地代理模式的RPC调用框架吧
在微服务架构中,服务间通信是一个核心问题。
虽然Dubbo、gRPC等成熟框架已经为我们提供了完整的RPC解决方案,但理解其底层原理并动手实现一个简化版本,对提升我们的技术理解深度很有帮助。
本文将带你从零开始,使用SpringBoot实现一个基于本地代理模式的RPC调用框架。
整体设计思路
我们的RPC框架采用经典的代理模式设计:
接口定义:定义服务接口,客户端和服务端共享
动态代理:客户端通过JDK动态代理生成接口实现类
序列化:使用JSON进行数据序列化传输
网络通信:基于HTTP协议进行服务间通信
服务注册:服务端暴露接口实现,客户端动态发现
核心代码实现
1. 定义RPC注解
首先创建用于标识RPC服务的注解:
// RpcService.java - 服务提供者注解 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { Class<?> value() default void.class; String version() default "1.0"; } // RpcReference.java - 服务消费者注解 @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface RpcReference { String version() default "1.0"; long timeout() default 5000; }
2. RPC请求响应模型
定义网络传输的数据结构:
// RpcRequest.java public class RpcRequest { private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; private String version; // 构造函数 public RpcRequest() { this.requestId = UUID.randomUUID().toString(); } // getter/setter方法 public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } } // RpcResponse.java public class RpcResponse { private String requestId; private Object result; private String error; private boolean success; public RpcResponse() {} public RpcResponse(String requestId) { this.requestId = requestId; } // getter/setter方法 public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; this.success = true; } public String getError() { return error; } public void setError(String error) { this.error = error; this.success = false; } public boolean isSuccess() { return success; } public void setSuccess(boolean success) { this.success = success; } }
3. 服务注册中心
实现简单的本地服务注册机制:
// ServiceRegistry.java @Component public class ServiceRegistry { private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class); // 服务实例注册表:接口名 -> 服务实现实例 private final Map<String, Object> serviceMap = new ConcurrentHashMap<>(); /** * 注册服务实例 */ public void registerService(Class<?> serviceInterface, String version, Object serviceImpl) { String serviceName = generateServiceName(serviceInterface, version); serviceMap.put(serviceName, serviceImpl); logger.info("注册服务成功: {} -> {}", serviceName, serviceImpl.getClass().getName()); } /** * 获取服务实例 */ public Object getService(String className, String version) { String serviceName = generateServiceName(className, version); Object service = serviceMap.get(serviceName); if (service == null) { logger.warn("未找到服务: {}", serviceName); } return service; } /** * 生成服务名称 */ private String generateServiceName(Class<?> serviceInterface, String version) { return generateServiceName(serviceInterface.getName(), version); } private String generateServiceName(String className, String version) { return className + ":" + version; } /** * 获取所有已注册的服务 */ public Set<String> getAllServices() { return new HashSet<>(serviceMap.keySet()); } }
4. RPC客户端代理工厂
这是框架的核心,通过动态代理实现透明的远程调用:
// RpcClientProxy.java @Component public class RpcClientProxy { private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class); @Autowired private RpcClient rpcClient; /** * 为指定接口创建代理实例 */ @SuppressWarnings("unchecked") public <T> T createProxy(Class<T> interfaceClass, String version, long timeout) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new RpcInvocationHandler(interfaceClass, version, timeout, rpcClient) ); } /** * 动态代理调用处理器 */ private static class RpcInvocationHandler implements InvocationHandler { private final Class<?> interfaceClass; private final String version; private final long timeout; private final RpcClient rpcClient; public RpcInvocationHandler(Class<?> interfaceClass, String version, long timeout, RpcClient rpcClient) { this.interfaceClass = interfaceClass; this.version = version; this.timeout = timeout; this.rpcClient = rpcClient; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 跳过Object类的基础方法 if (Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } // 构建RPC请求 RpcRequest request = buildRpcRequest(method, args); try { // 发送远程调用请求 RpcResponse response = rpcClient.sendRequest(request, timeout); if (response.isSuccess()) { return response.getResult(); } else { throw new RuntimeException("RPC调用失败: " + response.getError()); } } catch (Exception e) { logger.error("RPC调用异常: {}.{}", interfaceClass.getName(), method.getName(), e); throw new RuntimeException("RPC调用异常", e); } } /** * 构建RPC请求对象 */ private RpcRequest buildRpcRequest(Method method, Object[] args) { RpcRequest request = new RpcRequest(); request.setClassName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); request.setVersion(version); return request; } } }
5. RPC网络客户端
负责实际的网络通信:
// RpcClient.java @Component public class RpcClient { private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); @Autowired private RestTemplate restTemplate; @Value("${rpc.server.url:http://localhost:8080}") private String serverUrl; /** * 发送RPC请求 */ public RpcResponse sendRequest(RpcRequest request, long timeout) { try { logger.debug("发送RPC请求: {}.{}", request.getClassName(), request.getMethodName()); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity<RpcRequest> entity = new HttpEntity<>(request, headers); // 发送HTTP POST请求 ResponseEntity<RpcResponse> responseEntity = restTemplate.postForEntity( serverUrl + "/rpc/invoke", entity, RpcResponse.class ); RpcResponse response = responseEntity.getBody(); logger.debug("收到RPC响应: requestId={}, success={}", response.getRequestId(), response.isSuccess()); return response; } catch (Exception e) { logger.error("RPC请求发送失败", e); RpcResponse errorResponse = new RpcResponse(request.getRequestId()); errorResponse.setError("网络请求失败: " + e.getMessage()); return errorResponse; } } }
6. RPC服务端处理器
处理客户端发来的RPC请求:
@RestController @RequestMapping("/rpc") public class RpcRequestHandler { private static final Logger logger = LoggerFactory.getLogger(RpcRequestHandler.class); @Autowired private ServiceRegistry serviceRegistry; /** * 处理RPC调用请求 */ @PostMapping("/invoke") public RpcResponse handleRpcRequest(@RequestBody RpcRequest request) { RpcResponse response = new RpcResponse(request.getRequestId()); try { logger.debug("处理RPC请求: {}.{}", request.getClassName(), request.getMethodName()); // 查找服务实例 Object serviceInstance = serviceRegistry.getService(request.getClassName(), request.getVersion()); if (serviceInstance == null) { response.setError("服务未找到: " + request.getClassName()); return response; } // 通过反射调用方法 Class<?> serviceClass = serviceInstance.getClass(); Method method = serviceClass.getMethod(request.getMethodName(), request.getParameterTypes()); Object[] parameters = request.getParameters(); Class<?>[] paramTypes = method.getParameterTypes(); for (int i = 0; i < parameters.length; i++) { if (parameters[i] != null && ClassUtil.isBasicType(paramTypes[i])) { // 处理基本类型转换(如客户端传的是包装类型,服务端是基本类型) parameters[i] = convertType(paramTypes[i], parameters[i]); } } Object result = method.invoke(serviceInstance, parameters); response.setResult(result); logger.debug("RPC调用成功: {}.{}", request.getClassName(), request.getMethodName()); } catch (Exception e) { logger.error("RPC调用处理异常", e); response.setError("方法调用异常: " + e.getMessage()); } return response; } /** * 类型转换处理(支持基本类型和包装类互转) */ private Object convertType(Class<?> targetType, Object value) { // 处理null值 if (value == null) return null; // 类型匹配时直接返回 if (targetType.isInstance(value)) { return value; } // 处理数字类型转换 if (value instanceof Number) { Number number = (Number) value; if (targetType == int.class || targetType == Integer.class) return number.intValue(); if (targetType == long.class || targetType == Long.class) return number.longValue(); if (targetType == double.class || targetType == Double.class) return number.doubleValue(); if (targetType == float.class || targetType == Float.class) return number.floatValue(); if (targetType == byte.class || targetType == Byte.class) return number.byteValue(); if (targetType == short.class || targetType == Short.class) return number.shortValue(); } // 处理布尔类型转换 if (targetType == boolean.class || targetType == Boolean.class) { if (value instanceof Boolean) return value; return Boolean.parseBoolean(value.toString()); } // 处理字符类型转换 if (targetType == char.class || targetType == Character.class) { String str = value.toString(); if (!str.isEmpty()) return str.charAt(0); } throw new IllegalArgumentException(String.format( "类型转换失败: %s -> %s", value.getClass().getSimpleName(), targetType.getSimpleName() )); } /** * 查询已注册的服务列表 */ @GetMapping("/services") public Set<String> getRegisteredServices() { return serviceRegistry.getAllServices(); } }
7. 自动配置和Bean后处理器
实现Spring Boot的自动装配:
// RpcAutoConfiguration.java @Configuration @ComponentScan(basePackages = "com.example.rpc") public class RpcAutoConfiguration { @Bean @ConditionalOnMissingBean public RestTemplate restTemplate() { RestTemplate restTemplate = new RestTemplate(); // 设置连接超时 HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(); factory.setConnectTimeout(3000); factory.setReadTimeout(10000); restTemplate.setRequestFactory(factory); return restTemplate; } } // RpcServiceProcessor.java - 处理@RpcService注解 @Component public class RpcServiceProcessor implements BeanPostProcessor, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(RpcServiceProcessor.class); private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> beanClass = bean.getClass(); // 检查是否有@RpcService注解 RpcService rpcService = beanClass.getAnnotation(RpcService.class); if (rpcService != null) { registerRpcService(bean, rpcService); } return bean; } /** * 注册RPC服务 */ private void registerRpcService(Object serviceBean, RpcService rpcService) { ServiceRegistry serviceRegistry = applicationContext.getBean(ServiceRegistry.class); Class<?> interfaceClass = rpcService.value(); if (interfaceClass == void.class) { // 如果没有指定接口,自动查找第一个接口 Class<?>[] interfaces = serviceBean.getClass().getInterfaces(); if (interfaces.length > 0) { interfaceClass = interfaces[0]; } else { logger.warn("无法确定服务接口: {}", serviceBean.getClass().getName()); return; } } serviceRegistry.registerService(interfaceClass, rpcService.version(), serviceBean); } } // RpcReferenceProcessor.java - 处理@RpcReference注解 @Component public class RpcReferenceProcessor implements BeanPostProcessor, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(RpcReferenceProcessor.class); private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Class<?> beanClass = bean.getClass(); // 处理所有标注了@RpcReference的字段 Field[] fields = beanClass.getDeclaredFields(); for (Field field : fields) { RpcReference rpcReference = field.getAnnotation(RpcReference.class); if (rpcReference != null) { injectRpcReference(bean, field, rpcReference); } } return bean; } /** * 注入RPC服务代理 */ private void injectRpcReference(Object bean, Field field, RpcReference rpcReference) { try { RpcClientProxy rpcClientProxy = applicationContext.getBean(RpcClientProxy.class); Class<?> interfaceClass = field.getType(); Object proxyInstance = rpcClientProxy.createProxy( interfaceClass, rpcReference.version(), rpcReference.timeout() ); field.setAccessible(true); field.set(bean, proxyInstance); logger.info("注入RPC服务代理: {}", interfaceClass.getName()); } catch (Exception e) { logger.error("RPC服务代理注入失败: {}", field.getName(), e); throw new RuntimeException("RPC服务代理注入失败", e); } } }
使用示例
1. 定义服务接口
// UserService.java public interface UserService { String getUserName(Long userId); boolean updateUser(Long userId, String name); List<String> getUserList(); }
2. 实现服务提供者
// UserServiceImpl.java @RpcService(UserService.class) public class UserServiceImpl implements UserService { private static final Map<Long, String> userDatabase = new ConcurrentHashMap<>(); static { userDatabase.put(1L, "张三"); userDatabase.put(2L, "李四"); userDatabase.put(3L, "王五"); } @Override public String getUserName(Long userId) { String userName = userDatabase.get(userId); return userName != null ? userName : "用户不存在"; } @Override public boolean updateUser(Long userId, String name) { if (userDatabase.containsKey(userId)) { userDatabase.put(userId, name); return true; } return false; } @Override public List<String> getUserList() { return new ArrayList<>(userDatabase.values()); } }
3. 创建服务消费者
// UserController.java @RestController @RequestMapping("/user") public class UserController { @RpcReference private UserService userService; @GetMapping("/{userId}") public String getUser(@PathVariable Long userId) { return userService.getUserName(userId); } @PostMapping("/{userId}") public boolean updateUser(@PathVariable Long userId, @RequestParam String name) { return userService.updateUser(userId, name); } @GetMapping("/list") public List<String> getUserList() { return userService.getUserList(); } }
4. 启动类配置
// Application.java @SpringBootApplication @EnableAutoConfiguration public class RpcDemoApplication { public static void main(String[] args) { SpringApplication.run(RpcDemoApplication.class, args); } }
5. 配置文件
# application.yml server: port: 8080 rpc: server: url: http://localhost:8080 logging: level: com.example.rpc: DEBUG
测试验证
启动应用后,可以通过以下方式测试:
# 查询用户信息 curl http://localhost:8080/user/1 # 更新用户信息 curl -X POST "http://localhost:8080/user/1?name=新名字" # 获取用户列表 curl http://localhost:8080/user/list # 查看已注册的服务 curl http://localhost:8080/rpc/services
总结
这个实现虽然相对简单,但完整展现了RPC框架的核心思想。
在实际项目中,建议使用成熟的RPC框架如Dubbo或Spring Cloud,但理解底层原理对我们选择和优化技术方案很有价值。
到此这篇关于利用SpringBoot实现一个基于本地代理模式的RPC调用框架的文章就介绍到这了,更多相关SpringBoot实现RPC调用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!