java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Nacos服务注册与发现原理

Nacos服务注册与发现原理解读

作者:shangjg3

Nacos是阿里巴巴开源的微服务组件,支持服务注册、发现及配置管理,采用客户端心跳维护、服务端推送与长轮询同步,结合健康检查机制,实现高效可靠的微服务治理

Nacos是阿里巴巴开源的服务注册与发现组件,同时也提供配置管理功能。它支持基于DNS和RPC的服务发现,致力于帮助开发人员发现、配置和管理微服务。

下面将深入解析其服务注册与发现的核心原理,并提供关键源码示例。

核心原理架构

Nacos的服务注册与发现架构主要包含三个核心组件:

服务注册原理

服务注册是指服务提供者将自身服务实例的信息注册到Nacos Server的过程。

其核心流程如下:

服务注册核心源码

下面是Nacos服务注册的核心源码示例:

// ServiceRegistration接口定义服务注册的基本方法
public interface ServiceRegistration<T> {
    void register();
    void deregister();
    T getRegistration();
}

// NacosServiceRegistry实现了服务注册逻辑
@Service
public class NacosServiceRegistry implements ServiceRegistry<NacosRegistration> {
    
    private final NacosServiceManager nacosServiceManager;
    private final NacosRegistrationProperties registrationProperties;
    
    public NacosServiceRegistry(NacosServiceManager nacosServiceManager,
                               NacosRegistrationProperties registrationProperties) {
        this.nacosServiceManager = nacosServiceManager;
        this.registrationProperties = registrationProperties;
    }
    
    @Override
    public void register(NacosRegistration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        
        // 构建服务实例注册信息
        Instance instance = getNacosInstanceFromRegistration(registration);
        
        try {
            // 调用Nacos客户端进行服务注册
            namingService().registerInstance(
                registration.getServiceId(), 
                registration.getGroupName(), 
                instance);
            log.info("nacos registry, {} {}:{} register finished",
                registration.getServiceId(),
                instance.getIp(), instance.getPort());
        } catch (Exception e) {
            log.error("nacos registry error", e);
        }
    }
    
    // 获取Nacos命名服务客户端
    private NamingService namingService() throws NacosException {
        return nacosServiceManager.getNamingService(registrationProperties);
    }
    
    // 从注册信息构建Nacos实例对象
    private Instance getNacosInstanceFromRegistration(NacosRegistration registration) {
        Instance instance = new Instance();
        // 设置实例基本信息
        instance.setIp(registration.getIp());
        instance.setPort(registration.getPort());
        instance.setWeight(registration.getWeight() == null ? 1.0F : registration.getWeight());
        instance.setClusterName(registration.getClusterName());
        instance.setHealthy(registration.isHealthy());
        instance.setServiceName(registration.getServiceId());
        instance.setInstanceId(registration.getInstanceId());
        instance.setEphemeral(registration.isEphemeral());
        
        // 设置元数据
        if (registration.getMetadata() != null) {
            instance.setMetadata(registration.getMetadata());
        }
        
        return instance;
    }
}

// NacosNamingService是Nacos命名服务的核心实现类
public class NacosNamingService implements NamingService {
    
    private final NacosServiceFactory serviceFactory;
    private final NamingProxy namingProxy;
    private final ClientWorker clientWorker;
    
    public NacosNamingService(Properties properties) throws NacosException {
        // 初始化相关组件
        this.serviceFactory = new NacosServiceFactory(properties);
        this.namingProxy = serviceFactory.createNamingProxy();
        this.clientWorker = new ClientWorker(namingProxy, properties);
    }
    
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        // 检查服务名是否合法
        if (StringUtils.isEmpty(groupName)) {
            groupName = Constants.DEFAULT_GROUP;
        }
        
        // 调用客户端工作类处理注册
        clientWorker.registerInstance(serviceName, groupName, instance);
    }
    
    // 客户端工作类处理注册逻辑
    public class ClientWorker {
        private final NamingProxy namingProxy;
        private final ServiceInfoHolder serviceInfoHolder;
        private final ScheduledExecutorService executorService;
        
        public ClientWorker(NamingProxy namingProxy, Properties properties) {
            this.namingProxy = namingProxy;
            this.serviceInfoHolder = new ServiceInfoHolder();
            this.executorService = Executors.newScheduledThreadPool(1, 
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.naming.client.Worker");
                        t.setDaemon(true);
                        return t;
                    }
                });
            
            // 启动心跳任务
            scheduleHeartbeat();
        }
        
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
            // 构建注册请求参数
            Map<String, String> params = new HashMap<>(16);
            params.put("serviceName", serviceName);
            params.put("groupName", groupName);
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("clusterName", instance.getClusterName());
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("serviceId", instance.getServiceId());
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
            
            // 发送注册请求到Nacos Server
            namingProxy.registerInstance(params);
            
            // 加入到服务信息持有者中
            serviceInfoHolder.processServiceJson(
                serviceName, groupName, 
                JSON.toJSONString(Collections.singletonList(instance))
            );
        }
        
        // 启动心跳任务,定期发送心跳维持注册状态
        private void scheduleHeartbeat() {
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 发送心跳包
                        clientWorker.sendHeartbeat();
                    } catch (Exception e) {
                        log.error("Exception when sending heartbeat", e);
                    }
                }
            }, 5000, 5000, TimeUnit.MILLISECONDS);
        }
        
        // 发送心跳方法
        public void sendHeartbeat() throws NacosException {
            for (Map.Entry<String, List<Instance>> entry : serviceInfoHolder.getServices().entrySet()) {
                String serviceName = entry.getKey();
                List<Instance> instances = entry.getValue();
                
                for (Instance instance : instances) {
                    if (!instance.isHealthy() || !instance.isEphemeral()) {
                        continue;
                    }
                    
                    // 构建心跳请求参数
                    Map<String, String> params = new HashMap<>(16);
                    params.put("serviceName", serviceName);
                    params.put("ip", instance.getIp());
                    params.put("port", String.valueOf(instance.getPort()));
                    params.put("clusterName", instance.getClusterName());
                    params.put("serviceId", instance.getServiceId());
                    
                    // 发送心跳请求
                    namingProxy.sendHeartbeat(params);
                }
            }
        }
    }
}

服务发现原理

服务发现是指服务消费者从Nacos Server获取服务列表,并根据一定的负载均衡策略选择具体服务实例进行调用的过程。

核心流程如下:

服务发现核心源码

下面是Nacos服务发现的核心源码示例:

// NacosServiceDiscovery实现了服务发现逻辑
@Service
public class NacosServiceDiscovery implements ServiceDiscovery<ServiceInstance> {
    
    private final NacosServiceManager nacosServiceManager;
    private final NacosDiscoveryProperties discoveryProperties;
    
    public NacosServiceDiscovery(NacosServiceManager nacosServiceManager,
                               NacosDiscoveryProperties discoveryProperties) {
        this.nacosServiceManager = nacosServiceManager;
        this.discoveryProperties = discoveryProperties;
    }
    
    @Override
    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
        return getInstances(serviceId, "");
    }
    
    @Override
    public List<ServiceInstance> getInstances(String serviceId, String group) throws NacosException {
        if (StringUtils.isEmpty(serviceId)) {
            throw new NacosException(NacosException.INVALID_PARAM, "serviceId is empty");
        }
        
        if (StringUtils.isEmpty(group)) {
            group = discoveryProperties.getGroup();
        }
        
        // 调用Nacos命名服务获取实例列表
        List<Instance> instances = namingService().getInstances(serviceId, group);
        
        // 转换为标准ServiceInstance格式
        return instances.stream()
            .map(instance -> new NacosServiceInstance(instance, serviceId))
            .collect(Collectors.toList());
    }
    
    @Override
    public List<String> getServices() throws NacosException {
        // 获取所有服务列表
        return namingService().getServicesOfServer(1000, 0).stream()
            .map(serviceInfo -> serviceInfo.getName())
            .collect(Collectors.toList());
    }
    
    // 获取Nacos命名服务客户端
    private NamingService namingService() throws NacosException {
        return nacosServiceManager.getNamingService(discoveryProperties.getNacosProperties());
    }
}

// NacosNamingService中的服务发现相关方法
public class NacosNamingService implements NamingService {
    
    // 获取服务实例列表
    @Override
    public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters) 
        throws NacosException {
        if (StringUtils.isEmpty(groupName)) {
            groupName = Constants.DEFAULT_GROUP;
        }
        
        // 调用客户端工作类获取实例
        return clientWorker.getInstances(serviceName, groupName, clusters);
    }
    
    // 客户端工作类处理服务发现逻辑
    public class ClientWorker {
        // 服务信息持有者,缓存服务列表
        private final ServiceInfoHolder serviceInfoHolder;
        
        public List<Instance> getInstances(String serviceName, String groupName, List<String> clusters) 
            throws NacosException {
            // 构建服务标识
            String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName);
            
            // 从服务信息持有者获取服务信息
            ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceId);
            
            // 如果服务信息为空或已过期,主动拉取
            if (serviceInfo == null || serviceInfo.isExpired()) {
                serviceInfo = refreshServiceInfo(serviceName, groupName, clusters);
            }
            
            // 返回可用实例列表
            return serviceInfo == null ? Collections.emptyList() : serviceInfo.getHosts();
        }
        
        // 刷新服务信息
        public ServiceInfo refreshServiceInfo(String serviceName, String groupName, List<String> clusters) 
            throws NacosException {
            String serviceId = ServiceIdBuilder.buildServiceId(groupName, serviceName);
            
            // 构建请求参数
            Map<String, String> params = new HashMap<>(16);
            params.put("serviceName", serviceName);
            params.put("groupName", groupName);
            if (clusters != null && !clusters.isEmpty()) {
                params.put("clusters", StringUtils.join(clusters, ","));
            }
            
            // 调用Nacos Server获取服务信息
            String result = namingProxy.queryList(serviceId, params);
            
            // 处理服务信息
            return serviceInfoHolder.processServiceJson(serviceId, result);
        }
        
        // 服务信息持有者类,负责缓存和管理服务信息
        public class ServiceInfoHolder {
            // 服务信息缓存
            private final Map<String, ServiceInfo> services = new ConcurrentHashMap<>();
            // 上次更新时间
            private final Map<String, Long> lastRefTime = new ConcurrentHashMap<>();
            
            public ServiceInfo processServiceJson(String serviceId, String json) {
                if (StringUtils.isEmpty(json)) {
                    return null;
                }
                
                ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
                
                if (serviceInfo != null) {
                    // 更新服务信息
                    services.put(serviceId, serviceInfo);
                    lastRefTime.put(serviceId, System.currentTimeMillis());
                    
                    // 注册监听器,当服务信息变化时通知
                    if (null != listeners.get(serviceId)) {
                        for (EventListener listener : listeners.get(serviceId)) {
                            executorService.execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        listener.onEvent(new NamingEvent(serviceId, serviceInfo));
                                    } catch (Exception e) {
                                        log.error("EventListener execute error.", e);
                                    }
                                }
                            });
                        }
                    }
                }
                
                return serviceInfo;
            }
            
            // 获取服务信息
            public ServiceInfo getServiceInfo(String serviceId) {
                ServiceInfo serviceInfo = services.get(serviceId);
                if (serviceInfo != null) {
                    return serviceInfo;
                }
                
                // 如果服务信息不存在,主动拉取
                try {
                    return refreshServiceInfo(serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[0],
                        serviceId.split(ServiceIdBuilder.SERVICE_ID_SEPARATOR)[1], null);
                } catch (NacosException e) {
                    log.error("getServiceInfo error, serviceId: {}", serviceId, e);
                }
                
                return null;
            }
            
            // 服务信息是否过期
            public boolean isExpired(String serviceId) {
                Long lastRef = lastRefTime.get(serviceId);
                if (lastRef == null) {
                    return true;
                }
                
                // 默认15秒更新一次
                return (System.currentTimeMillis() - lastRef) > 15 * 1000;
            }
        }
    }
}

服务健康检查机制

Nacos的服务健康检查是保证服务可用性的关键机制,主要包含两种检查方式:

1. 客户端主动上报:服务实例定期向Nacos Server发送心跳包

2. 服务端主动检查:Nacos Server定期向服务实例发送健康检查请求

健康检查的核心源码涉及到ClientWorker类中的心跳机制和Server端的检查逻辑,上述源码中已包含客户端心跳相关部分。

服务配置与同步机制

Nacos采用了长轮询和推送相结合的方式实现服务配置的实时同步:

1. 客户端发起长轮询请求到服务端

2. 服务端有变更时立即响应,无变更则等待一段时间后响应

3. 客户端收到响应后立即发起新的长轮询请求

4. 服务端也可以主动推送变更到客户端

这种机制保证了服务信息的实时性和一致性,同时减少了客户端与服务端的通信开销。

总结

Nacos的服务注册与发现机制通过简洁而高效的设计,实现了微服务的自动注册、发现和健康管理。其核心原理包括:

通过上述源码可以看到,Nacos通过NamingService接口封装了核心功能,ClientWorker处理具体的注册、发现和心跳逻辑,ServiceInfoHolder负责服务信息的缓存和管理,整体架构清晰且易于扩展。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文