当前位置:首页 > 数据库

Nacos源码系列—关于服务端那些事儿

前言

在上节课中,源于服我们讲解了客户端注册服务的码系大体流程,客户端在注册服务的列关时候调用的是 NamingService.registerInstance 来完成实例的注册,在最后呢我们知道服务注册是那事通过 nacos/v1/ns/instance 接口来完成注册的,我们今天来讲解服务端的源于服注册,首先就从这个接口地址开始,码系来看具体服务端都做了哪些事情。列关

服务注册

上面是那事我们从官网中找到的Nacos架构图,从这个图中我们大体可以得出我们要找的源于服接口应该是在NamingService这个服务中,同时我们在项目结构中也可以看到naming这个模块,码系naming就是列关实现服务注册的,我们都知道请求路径都是通过controller来进行处理的,而在其中我们可以看到一个InstanceController的那事这么一个类,那么注册实例肯定会和它有关。源于服可以看到InstanceController类的码系请求路由即是我们POST请求的路由的香港云服务器部分,如下:

所以,列关我们就从开始研究接收请求处理服务注册的源码,我们找到通过RestFul API接口,请求类型为Post,的方法,符合条件的只有InstanceController.register方法,这个方法用来接收用户的请求,并且把收到的信息进行解析,装换成实例信息,然后通过getInstanceOperator().registerInstance进行调用,这个方法也是服务注册的核心。

@CanDistro

@PostMapping

@Secured(action = ActionTypes.WRITE)

public String register(HttpServletRequest request) throws Exception {

//从 request信息中获取namespaceId,如果没有默认为public

final String namespaceId = WebUtils

.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

//获取服务名称 格式:“group@@serviceName”

final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

NamingUtils.checkServiceNameFormat(serviceName);

//将request参数还原成instance实例

final Instance instance = HttpRequestInstanceBuilder.newBuilder()

.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();

//【核心】注册服务实例

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

return "ok";

}

我们先来看一下下面这个核心方法;

private InstanceOperator getInstanceOperator() {

return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;

}

getInstanceOperator() 这个判断是否走Grpc协议,默认走Grpc,所以我们使用的是instanceServiceV2这个实例对象;

private InstanceOperator getInstanceOperator() {

return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;

}

instanceServiceV2就是InstanceOperatorClientImpl,方法所以我们需要进入的是下面这个实例的处理类。

具体方法如下所示:

@Override

public void registerInstance(String namespaceId, String serviceName, Instance instance) {

//判断是否为临时客户端

boolean ephemeral = instance.isEphemeral();

//获取客户端ID

String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);

//通过客户端ID创建客户端连接

createIpPortClientIfAbsent(clientId);

//获取服务信息

Service service = getService(namespaceId, serviceName, ephemeral);

//注册服务

clientOperationService.registerInstance(service, instance, clientId);

}

从Nacos2.0以后,云服务器提供商新增了Client模型,管理与该客户机有关的数据内容,如果一个客户机发布了一个服务,那么这个客户机发布的所有服务和订阅者信息都会被更新到一个Client对象中,这个Client对象对应于这个客户机的链接,然后通过事件机制触发索引信息的更新。Client负责管理一个客户机的服务实例注册Publish和服务订阅Subscribe,可以方便地对需要推送的服务范围进行快速聚合,同时一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的 clientId。

package com.alibaba.nacos.naming.core.v2.client;

public interface Client {

// 客户端id

String getClientId();

// 是否临时客户端

boolean isEphemeral();

//设置客户端更新时间

void setLastUpdatedTime();

//获取客户端更新时间

long getLastUpdatedTime();

// 服务实例注册

boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);

//服务实例移除

InstancePublishInfo removeServiceInstance(Service service);

//服务实例查询

InstancePublishInfo getInstancePublishInfo(Service service);

CollectiongetAllPublishedService();

// 服务订阅

boolean addServiceSubscriber(Service service, Subscriber subscriber);

///取消订阅

boolean removeServiceSubscriber(Service service);

//查询订阅

Subscriber getSubscriber(Service service);

CollectiongetAllSubscribeService();

// 生成同步给其他节点的client数据

ClientSyncData generateSyncData();

// 是否过期

boolean isExpire(long currentTime);

// 释放资源

void release();

}

知道了Client模型后,我们来接着从clientOperationService.registerInstance(service, instance, clientId);找到对应的具体实现。

EphemeralClientOperationServiceImpl.registerInstance()

下面这个方法是具体来负责处理服务注册,我们来详细了解一下:

@Override

public void registerInstance(Service service, Instance instance, String clientId) {

//确保Service单例存在,注意Service的equals和hasCode方法

Service singleton = ServiceManager.getInstance().getSingleton(service);

//如果不是临时客户端

if (!singleton.isEphemeral()) {

throw new NacosRuntimeException(NacosException.INVALID_PARAM,

String.format("Current service %s is persistent service, cant register ephemeral instance.",

singleton.getGroupedServiceName()));

}

//根据客户端ID找到客户端信息,这个关系在连接建立的时候存储

Client client = clientManager.getClient(clientId);

if (!clientIsLegal(client, clientId)) {

return;

}

//将客户端实例模型,装换成服务端实例模型

InstancePublishInfo instanceInfo = getPublishInfo(instance);

//将实例存储到client中

client.addServiceInstance(singleton, instanceInfo);

//设置最后更新时间

client.setLastUpdatedTime();

//建立服务和客户端的关联关系

NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));

NotifyCenter

.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));

}

ServiceManager.getInstance().getSingleton() 当调用getSingleton的时候会负责管理service的云服务器单例,在这里service会重写equlas和hasCode方法作为key。

public class ServiceManager {

//单例service 看service中equals和hasCode方法

private final ConcurrentHashMapsingletonRepository;

//namespace下所有的service

private final ConcurrentHashMap> namespaceSingletonMaps;

//通过Map储存单例的Service

public Service getSingleton(Service service) {

singletonRepository.putIfAbsent(service, service);

Service result = singletonRepository.get(service);

namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());

namespaceSingletonMaps.get(result.getNamespace()).add(result);

return result;

}

}

service中 equal和hasCode方法,namespace+group+name在服务端是一个单例Service。

@Override

public boolean equals(Object o) {

if (this == o) {

return true;

}

if (!(o instanceof Service)) {

return false;

}

Service service = (Service) o;

return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);

}

@Override

public int hashCode() {

return Objects.hash(namespace, group, name);

}

clientManager.getClient() 这里对应的实现类为ConnectionBasedClientManager这个实现类负责管理长连接clientId和client模型的映射关系。

@Component("connectionBasedClientManager")

public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {

//通过map存储ID和client之间的关联关系

private final ConcurrentMapclients = new ConcurrentHashMap<>();

//根据clientId查询client

@Override

public Client getClient(String clientId) {

return clients.get(clientId);

}

}

client.addServiceInstance(); 抽象类为AbstractClient:负责存储当前客户端服务注册表,也就是 service和instance的关系。

protected final ConcurrentHashMappublishers = new ConcurrentHashMap<>(16, 0.75f, 1);、

//将service和实例进行关联

@Override

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {

if (null == publishers.put(service, instancePublishInfo)) {

//监控指标自增实例数

MetricsMonitor.incrementInstanceCount();

}

NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));

Loggers.SRV_LOG.info("Client change for service { }, { }", service, getClientId());

return true;

}

ClientOperationEvent.ClientRegisterServiceEvent() :这里目的是为了过滤目标服务得到最终instance列表建立service和client的关系,能够方便我们快速查询,同时会触发ClientServiceIndexesManager的监听事件。

//服务与发布client的关系

private final ConcurrentMap> publisherIndexes = new ConcurrentHashMap<>();

//服务与订阅clientId的关系

private final ConcurrentMap> subscriberIndexes = new ConcurrentHashMap<>();

private void handleClientOperation(ClientOperationEvent event) {

Service service = event.getService();

String clientId = event.getClientId();

if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {

addPublisherIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {

removePublisherIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {

addSubscriberIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {

removeSubscriberIndexes(service, clientId);

}

}

private void addPublisherIndexes(Service service, String clientId) {

publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());

publisherIndexes.get(service).add(clientId);

NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));

}

private void removePublisherIndexes(Service service, String clientId) {

if (!publisherIndexes.containsKey(service)) {

return;

}

publisherIndexes.get(service).remove(clientId);

NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));

}

private void addSubscriberIndexes(Service service, String clientId) {

subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());

// Fix #5404, Only first time add need notify event.

if (subscriberIndexes.get(service).add(clientId)) {

NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));

}

}

private void removeSubscriberIndexes(Service service, String clientId) {

if (!subscriberIndexes.containsKey(service)) {

return;

}

subscriberIndexes.get(service).remove(clientId);

if (subscriberIndexes.get(service).isEmpty()) {

subscriberIndexes.remove(service);

}

}

请求流程图:

服务端监控检查

Nacos作为注册中心不止提供了服务注册和服务发现的功能,还提供了服务可用性检测的功能,在1.0的版本中,临时实例走的是distro协议,客户端向注册中心发送心跳来维持自身的健康(healthy)状态,持久实例则走的是Raft协议存储。

1.两种检测机制:

客户端主动上报机制服务器端主动下探机制

客户端主动上报机制:你主动找上级,说你没有打卡(不健康状态)。

服务器端主动下探机制:上级检测到你有不打卡的记录,主动来找你。

对于Nacos健康检测机制,我们不能主动去设置,但是健康检查机制是和Nacos的服务实例类型强相关,主要是有两种服务实例:

临时实例:客户端主动上报持久实例:服务端主动下探客户端主动上报

临时实例每隔5秒会主动上报自己的健康状态,发送心跳,如果发送心跳的间隔时间超过15秒,Nacos服务器端会将服务标记为亚健康状态,如果超过30S没有发送心跳,那么服务实例会被从服务列表中剔除。

在2.0版本以后,持久实例不变,临时实例而是通过长连接来判断实例是否健康。

2.长连接:

一个连接上可以连续发送多数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包,在Nacos2.0之后,使用Grpc协议代替了http协议。长连接会保持客户端和服务端发送的状态,在源码中ConnectionManager 管理所有客户端的长连接。

ConnectionManager: 每3秒检测所有超过20S内没有发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。

如果客户端持续和服务端进行通讯,服务端是不需要主动下探的,只有当客户端没有一直和服务端通信的时候,服务端才会主动下探操作。

@Service

public class ConnectionManager extends Subscriber{

Mapconnections = new ConcurrentHashMap();

//只要spring容器启动,会触发这个方法

@PostConstruct

public void start() {

// 启动不健康连接排除功能.

RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() {

// 1. 统计过时(20s)连接

Set> entries = connections.entrySet();

//2.获得需要剔除的IP和端口

//3.根据限制获取剔除的IP和端口

//4.如果还是有需要剔除的客户端,则继续执行

//5.没有活动的客户端执行探测

//6.如果没有马上响应,则马上剔除

//7.剔除后发布ClientDisconnectEvent事件

}

});

}

}

//注销(移出)连接方法

public synchronized void unregister(String connectionId) {

Connection remove = this.connections.remove(connectionId);

if (remove != null) {

String clientIp = remove.getMetaInfo().clientIp;

AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);

if (atomicInteger != null) {

int count = atomicInteger.decrementAndGet();

if (count <= 0) {

connectionForClientIp.remove(clientIp);

}

}

remove.close();

Loggers.REMOTE_DIGEST.info("[{ }]Connection unregistered successfully. ", connectionId);

clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);

}

当服务端操作移除事件以后,会操作notifyClientDisConnected()方法,主要调用的是 clientConnectionEventListener.clientDisConnected(connection)方法,将连接信息传入进去。

public void notifyClientDisConnected(final Connection connection) {

for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {

try {

clientConnectionEventListener.clientDisConnected(connection);

} catch (Throwable throwable) {

Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener { }",

clientConnectionEventListener.getName(), throwable);

}

}

clientConnectionEventListenerd的实现类是ConnectionBasedClientManager,在这里面会出发清除索引缓存等操作。

@Component("connectionBasedClientManager")

public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {

@Override

public boolean clientDisconnected(String clientId) {

Loggers.SRV_LOG.info("Client connection { } disconnect, remove instances and subscribers", clientId);

//同步移除client数据

ConnectionBasedClient client = clients.remove(clientId);

if (null == client) {

return true;

}

client.release();

//服务订阅,将变更通知到客户端

NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));

return true;

}

}

总结

到这里Nacos服务端的基础的源码就讲完了,有些地方我们没有展开来讲,在后续的源码讲解中,会给大家详细的进行讲解,今天主要讲解了,服务端注册以及监控检查的基础代码,后面会有最新的内容呈现给大家,如果觉得文中对您有帮助的。

分享到:

滇ICP备2023006006号-16