Nacos服务注册与发现原理解读
作者:shangjg3
Nacos是阿里巴巴开源的服务注册与发现组件,同时也提供配置管理功能。它支持基于DNS和RPC的服务发现,致力于帮助开发人员发现、配置和管理微服务。
下面将深入解析其服务注册与发现的核心原理,并提供关键源码示例。
核心原理架构
Nacos的服务注册与发现架构主要包含三个核心组件:
- 服务提供者:负责将自身服务实例注册到Nacos Server
- 服务消费者:负责从Nacos Server获取服务列表并进行服务调用
- Nacos Server:负责维护服务实例信息,处理注册、发现、健康检查等请求
服务注册原理
服务注册是指服务提供者将自身服务实例的信息注册到Nacos Server的过程。
其核心流程如下:
- 1. 服务实例启动时,通过SDK向Nacos Server发送注册请求
- 2. Nacos Server接收请求并验证信息,将服务实例信息存储到内存和持久化存储中
- 3. 服务实例定期向Nacos Server发送心跳包维持注册状态
服务注册核心源码
下面是Nacos服务注册的核心源码示例:
// ServiceRegistration接口定义服务注册的基本方法 public interface ServiceRegistration<T> { void register(); void deregister(); T getRegistration(); } // NacosServiceRegistry实现了服务注册逻辑 @Service public class NacosServiceRegistry implements ServiceRegistry<NacosRegistration> { private final NacosServiceManager nacosServiceManager; private final NacosRegistrationProperties registrationProperties; public NacosServiceRegistry(NacosServiceManager nacosServiceManager, NacosRegistrationProperties registrationProperties) { this.nacosServiceManager = nacosServiceManager; this.registrationProperties = registrationProperties; } @Override public void register(NacosRegistration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } // 构建服务实例注册信息 Instance instance = getNacosInstanceFromRegistration(registration); try { // 调用Nacos客户端进行服务注册 namingService().registerInstance( registration.getServiceId(), registration.getGroupName(), instance); log.info("nacos registry, {} {}:{} register finished", registration.getServiceId(), instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry error", e); } } // 获取Nacos命名服务客户端 private NamingService namingService() throws NacosException { return nacosServiceManager.getNamingService(registrationProperties); } // 从注册信息构建Nacos实例对象 private Instance getNacosInstanceFromRegistration(NacosRegistration registration) { Instance instance = new Instance(); // 设置实例基本信息 instance.setIp(registration.getIp()); instance.setPort(registration.getPort()); instance.setWeight(registration.getWeight() == null ? 1.0F : registration.getWeight()); instance.setClusterName(registration.getClusterName()); instance.setHealthy(registration.isHealthy()); instance.setServiceName(registration.getServiceId()); instance.setInstanceId(registration.getInstanceId()); instance.setEphemeral(registration.isEphemeral()); // 设置元数据 if (registration.getMetadata() != null) { instance.setMetadata(registration.getMetadata()); } return instance; } } // NacosNamingService是Nacos命名服务的核心实现类 public class NacosNamingService implements NamingService { private final NacosServiceFactory serviceFactory; private final NamingProxy namingProxy; private final ClientWorker clientWorker; public NacosNamingService(Properties properties) throws NacosException { // 初始化相关组件 this.serviceFactory = new NacosServiceFactory(properties); this.namingProxy = serviceFactory.createNamingProxy(); this.clientWorker = new ClientWorker(namingProxy, properties); } @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 检查服务名是否合法 if (StringUtils.isEmpty(groupName)) { groupName = Constants.DEFAULT_GROUP; } // 调用客户端工作类处理注册 clientWorker.registerInstance(serviceName, groupName, instance); } // 客户端工作类处理注册逻辑 public class ClientWorker { private final NamingProxy namingProxy; private final ServiceInfoHolder serviceInfoHolder; private final ScheduledExecutorService executorService; public ClientWorker(NamingProxy namingProxy, Properties properties) { this.namingProxy = namingProxy; this.serviceInfoHolder = new ServiceInfoHolder(); this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.naming.client.Worker"); t.setDaemon(true); return t; } }); // 启动心跳任务 scheduleHeartbeat(); } public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 构建注册请求参数 Map<String, String> params = new HashMap<>(16); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("clusterName", instance.getClusterName()); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("serviceId", instance.getServiceId()); params.put("metadata", JSON.toJSONString(instance.getMetadata())); // 发送注册请求到Nacos Server namingProxy.registerInstance(params); // 加入到服务信息持有者中 serviceInfoHolder.processServiceJson( serviceName, groupName, JSON.toJSONString(Collections.singletonList(instance)) ); } // 启动心跳任务,定期发送心跳维持注册状态 private void scheduleHeartbeat() { executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 发送心跳包 clientWorker.sendHeartbeat(); } catch (Exception e) { log.error("Exception when sending heartbeat", e); } } }, 5000, 5000, TimeUnit.MILLISECONDS); } // 发送心跳方法 public void sendHeartbeat() throws NacosException { for (Map.Entry<String, List<Instance>> entry : serviceInfoHolder.getServices().entrySet()) { String serviceName = entry.getKey(); List<Instance> instances = entry.getValue(); for (Instance instance : instances) { if (!instance.isHealthy() || !instance.isEphemeral()) { continue; } // 构建心跳请求参数 Map<String, String> params = new HashMap<>(16); params.put("serviceName", serviceName); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("clusterName", instance.getClusterName()); params.put("serviceId", instance.getServiceId()); // 发送心跳请求 namingProxy.sendHeartbeat(params); } } } } }
服务发现原理
服务发现是指服务消费者从Nacos Server获取服务列表,并根据一定的负载均衡策略选择具体服务实例进行调用的过程。
核心流程如下:
- 1. 服务消费者启动时,向Nacos Server订阅所需服务
- 2. Nacos Server推送服务列表给消费者
- 3. 消费者本地缓存服务列表,并定期更新
- 4. 服务调用时,根据负载均衡策略选择具体实例
服务发现核心源码
下面是Nacos服务发现的核心源码示例:
// NacosServiceDiscovery实现了服务发现逻辑 @Service public class NacosServiceDiscovery implements ServiceDiscovery<ServiceInstance> { private final NacosServiceManager nacosServiceManager; private final NacosDiscoveryProperties discoveryProperties; public NacosServiceDiscovery(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties discoveryProperties) { this.nacosServiceManager = nacosServiceManager; this.discoveryProperties = discoveryProperties; } @Override public List<ServiceInstance> getInstances(String serviceId) throws NacosException { return getInstances(serviceId, ""); } @Override public List<ServiceInstance> getInstances(String serviceId, String group) throws NacosException { if (StringUtils.isEmpty(serviceId)) { throw new NacosException(NacosException.INVALID_PARAM, "serviceId is empty"); } if (StringUtils.isEmpty(group)) { group = discoveryProperties.getGroup(); } // 调用Nacos命名服务获取实例列表 List<Instance> instances = namingService().getInstances(serviceId, group); // 转换为标准ServiceInstance格式 return instances.stream() .map(instance -> new NacosServiceInstance(instance, serviceId)) .collect(Collectors.toList()); } @Override public List<String> getServices() throws NacosException { // 获取所有服务列表 return namingService().getServicesOfServer(1000, 0).stream() .map(serviceInfo -> serviceInfo.getName()) .collect(Collectors.toList()); } // 获取Nacos命名服务客户端 private NamingService namingService() throws NacosException { return nacosServiceManager.getNamingService(discoveryProperties.getNacosProperties()); } } // NacosNamingService中的服务发现相关方法 public class NacosNamingService implements NamingService { // 获取服务实例列表 @Override public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters) throws NacosException { if (StringUtils.isEmpty(groupName)) { groupName = Constants.DEFAULT_GROUP; } // 调用客户端工作类获取实例 return clientWorker.getInstances(serviceName, groupName, clusters); } // 客户端工作类处理服务发现逻辑 public class ClientWorker { // 服务信息持有者,缓存服务列表 private final ServiceInfoHolder serviceInfoHolder; public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters) throws NacosException { // 构建服务标识 String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName); // 从服务信息持有者获取服务信息 ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceId); // 如果服务信息为空或已过期,主动拉取 if (serviceInfo == null || serviceInfo.isExpired()) { serviceInfo = refreshServiceInfo(serviceName, groupName, clusters); } // 返回可用实例列表 return serviceInfo == null ? Collections.emptyList() : serviceInfo.getHosts(); } // 刷新服务信息 public ServiceInfo refreshServiceInfo(String serviceName, String groupName, List<String> clusters) throws NacosException { String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName); // 构建请求参数 Map<String, String> params = new HashMap<>(16); params.put("serviceName", serviceName); params.put("groupName", groupName); if (clusters != null && !clusters.isEmpty()) { params.put("clusters", StringUtils.join(clusters, ",")); } // 调用Nacos Server获取服务信息 String result = namingProxy.queryList(serviceId, params); // 处理服务信息 return serviceInfoHolder.processServiceJson(serviceId, result); } // 服务信息持有者类,负责缓存和管理服务信息 public class ServiceInfoHolder { // 服务信息缓存 private final Map<String, ServiceInfo> services = new ConcurrentHashMap<>(); // 上次更新时间 private final Map<String, Long> lastRefTime = new ConcurrentHashMap<>(); public ServiceInfo processServiceJson(String serviceId, String json) { if (StringUtils.isEmpty(json)) { return null; } ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class); if (serviceInfo != null) { // 更新服务信息 services.put(serviceId, serviceInfo); lastRefTime.put(serviceId, System.currentTimeMillis()); // 注册监听器,当服务信息变化时通知 if (null != listeners.get(serviceId)) { for (EventListener listener : listeners.get(serviceId)) { executorService.execute(new Runnable() { @Override public void run() { try { listener.onEvent(new NamingEvent(serviceId, serviceInfo)); } catch (Exception e) { log.error("EventListener execute error.", e); } } }); } } } return serviceInfo; } // 获取服务信息 public ServiceInfo getServiceInfo(String serviceId) { ServiceInfo serviceInfo = services.get(serviceId); if (serviceInfo != null) { return serviceInfo; } // 如果服务信息不存在,主动拉取 try { return refreshServiceInfo(serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[0], serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[1], null); } catch (NacosException e) { log.error("getServiceInfo error, serviceId: {}", serviceId, e); } return null; } // 服务信息是否过期 public boolean isExpired(String serviceId) { Long lastRef = lastRefTime.get(serviceId); if (lastRef == null) { return true; } // 默认15秒更新一次 return (System.currentTimeMillis() - lastRef) > 15 * 1000; } } } }
服务健康检查机制
Nacos的服务健康检查是保证服务可用性的关键机制,主要包含两种检查方式:
1. 客户端主动上报:服务实例定期向Nacos Server发送心跳包
2. 服务端主动检查:Nacos Server定期向服务实例发送健康检查请求
健康检查的核心源码涉及到ClientWorker类中的心跳机制和Server端的检查逻辑,上述源码中已包含客户端心跳相关部分。
服务配置与同步机制
Nacos采用了长轮询和推送相结合的方式实现服务配置的实时同步:
1. 客户端发起长轮询请求到服务端
2. 服务端有变更时立即响应,无变更则等待一段时间后响应
3. 客户端收到响应后立即发起新的长轮询请求
4. 服务端也可以主动推送变更到客户端
这种机制保证了服务信息的实时性和一致性,同时减少了客户端与服务端的通信开销。
总结
Nacos的服务注册与发现机制通过简洁而高效的设计,实现了微服务的自动注册、发现和健康管理。其核心原理包括:
- 基于客户端的服务注册与心跳维持
- 基于长轮询和推送的服务信息同步
- 灵活的服务发现与负载均衡策略
- 可靠的服务健康检查机制
通过上述源码可以看到,Nacos通过NamingService接口封装了核心功能,ClientWorker处理具体的注册、发现和心跳逻辑,ServiceInfoHolder负责服务信息的缓存和管理,整体架构清晰且易于扩展。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。