blog

スプリングクラウド-アリババ-nacos-discovery

どのような用途で春のソースコードを読むために私に尋ねないでください、尋ねることは無駄です、ちょうど私が自分の自信の過程で春を使用してみましょう!春に関連する投稿 - 一般的なフローチャートを説明するた...

May 30, 2020 · 24 min. read
シェア

私にスプリングのソースコードを読めと言うのは無駄なことです!

説明

大まかなフローチャート

1. nacos-configについては後述する。

springnacosの統合

上図は、プロセス全体を3つの部分に大別したものである。
1. springnacosを統合する
2. nacosクライアント側の処理
3. nacosサーバー側の処理

nacos

1. サードパーティコンポーネントの一般的な春の統合では、少なくとも2つのパッケージが存在する。1つはspring-boot/cloud-XXX-starter用、もう1つはサービスコアパッケージ用で、つまりスタートアップパッケージとコアパッケージである。
2. springnacosサービスとディスカバリーを統合するためのスターターパッケージはspring-cloud-Alibaba-nacoc-discoveryである。
3. このパッケージが行うのは、現在のジャーのスプリングを登録することである。.factoriesクラスをspringiocコンテナに登録する。<spring-boot-XXX-starterとその原則を手書きする>
//すべてはNacosServiceRegistryAutoConfigurationから始まる。
//NacosServiceRegistryAutoConfigurationspring-cloud-Alibaba-nacoc-discoveryにある。.factoriesnacosクラスタに登録されている
//com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration.nacosServiceRegistry(NacosDiscoveryProperties)
//nacosServiceRegistryを作成する 
@Bean
public NacosServiceRegistry nacosServiceRegistry(
		NacosDiscoveryProperties nacosDiscoveryProperties) {
	return new NacosServiceRegistry(nacosDiscoveryProperties);
}
nacosServiceRegistry含まれるプロパティ
 NacosDiscoveryProperties
 NamingService
//このメソッドは、nacosServiceRegistryの2つのプロパティにそれぞれ値を代入しているだけのように見える。
//NacosDiscoveryPropertiesこれはコンフィギュレーション・パラメータ・クラスである。@Bean 
//主なクラスはNamingServiceで、これはnacosのコンポーネント・クラスである。
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
	this.nacosDiscoveryProperties = nacosDiscoveryProperties;
	this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
public NamingService namingServiceInstance() {
 //ダイレクト・リターンがある場合
	if (null != namingService) {
		return namingService;
	}
	try {
	 //お持ちでない場合は作成する
		namingService = NacosFactory.createNamingService(getNacosProperties());
	}
	catch (Exception e) {
		log.error("create naming service error!properties={},e=,", this, e);
		return null;
	}
	return namingService;
}
//getNacosPropertiesこの方法では、サービスに関するいくつかの情報を得る。
//  
 public static NamingService createNamingService(Properties properties) throws NacosException {
 return NamingFactory.createNamingService(properties);
}
//
public static NamingService createNamingService(Properties properties) throws NacosException {
 try {
 //リフレクションによって作成されたことに気づくだろう。
 //clssオブジェクトを取得する
 Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
 Constructor constructor = driverImplClass.getConstructor(Properties.class);
 //パラメトリック・コンストラクタを呼び出す
 NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
 return vendorImpl;
 } catch (Throwable e) {
 throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
 }
}
//com.alibaba.nacos.client.naming.NacosNamingService.NacosNamingService(Properties)
 public NacosNamingService(Properties properties) {
 init(properties);
 }
 
 //初期化 
 private void init(Properties properties) {
 //名前空間パブリック
 //以下のコード行は、その名の通り初期化作業を行うものだが、私はあまり見ていない。
 namespace = InitUtils.initNamespaceForNaming(properties);
 initServerAddr(properties);
 InitUtils.initWebRootContext();
 initCacheDir();
 initLogName(properties);
 eventDispatcher = new EventDispatcher();
 //この中には、nacosサービス自体に特有と思われる時限タスクが2つあるが、私は見ていない。
 serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
 
 //ハートビートという名前から推測できるように、ハートビートを送信するタスクを処理する時間指定スレッドのプールがある。
 //initClientBeatThreadCountこのメソッドは、マシンに基づいてスレッド数を選択する。
 beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
 //これはサービスの変更を処理するタスクである。
 hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),
 initPollingThreadCount(properties));
}

nacos


//nacosクライアントの起動元はスプリングリスナーである
//NacosAutoServiceRegistrationはNacosServiceRegistryAutoConfigurationにある。@bean 
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
		NacosServiceRegistry registry,
		AutoServiceRegistrationProperties autoServiceRegistrationProperties,
		NacosRegistration registration) {
	return new NacosAutoServiceRegistration(registry,
			autoServiceRegistrationProperties, registration);
}
NacosAutoServiceRegistration(nacosを実装している。
ApplicationListener<WebServerInitializedEvent> ウェブの初期化イベントをリッスンする
//リスナーの原理についてはここでは触れないが、スプリング・リスナーの原理についてはいつか別の記事を書くつもりだ。
//ここで知っておくべきことは、リスニング・イベントがトリガーされると、実装クラスのonApplicationEventメソッドが呼び出されるということだ。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
	//イベント実行エントリーをリスニングする
	bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
	ApplicationContext context = event.getApplicationContext();
	if (context instanceof ConfigurableWebServerApplicationContext) {
		if ("management".equals(((ConfigurableWebServerApplicationContext) context)
				.getServerNamespace())) {
			return;
		}
	}
	this.port.compareAndSet(0, event.getWebServer().getPort());
	// 
	this.start();
}
public void start() {
	if (!isEnabled()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}
		return;
	}
	// only initialize if nonSecurePort is greater than 0 and it isn't already running
	// because of containerPortInitializer below
	if (!this.running.get()) {
		this.context.publishEvent(
				new InstancePreRegisteredEvent(this, getRegistration()));
		//登録を開始する
		//このコードもすべてspring-cloudで実行され、いくつかの共通インターフェースを提供する。
		register();
		if (shouldRegisterManagement()) {
			registerManagement();
		}
		this.context.publishEvent(
				new InstanceRegisteredEvent<>(this, getConfiguration()));
		this.running.compareAndSet(false, true);
	}
}
@Override
//現在配置されているNacosAutoServiceRegistrationクラス
protected void register() {
	if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
		log.debug("Registration disabled.");
		return;
	}
	if (this.registration.getPort() < 0) {
		this.registration.setPort(getPort().get());
	}
	//親クラスのregisterメソッドを呼び出す
	super.register();
}
//現在位置 AbstractAutoServiceRegistration 
protected void register() {
	//serviceRegistryプロパティのメソッドを呼び出す
	//上記のコードは、全体的に仕事をするようになる:
	//実際の登録コードは、AbstractAutoServiceRegistrationのserviceRegistryプロパティのregisterから始まる。
	//nacosNacosServiceRegistryクラスはServiceRegistryインターフェイスを実装している。
	//NacosAutoServiceRegistrationAbstractAutoServiceRegistration を継承する。
	//現在のNacosAutoServiceRegistrationクラスの実装は、プロパティserviceRegistryを持ち、これはnacosが提供するNacos Service Registryによって実装されている。
	//全体的な考え方は、スプリングが共通のコードとインターフェースのセットを提供し、サードパーティーのコンポーネントがそれらを実装するというものだ。
	this.serviceRegistry.register(getRegistration());
}
//nacos開始するコード
//現在のクラスは、前述の春のnacos統合で使用されたものである。 @bean 登録済み 
@Override
public void register(Registration registration) {
 
 //サービス名であるserviceIdを取得する
 //マイクロサービス用にspringをセットアップしていない場合は.application.name サービスがnacosに登録されていない
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();
	String group = nacosDiscoveryProperties.getGroup();
 //ip、ポートなどのインスタンス情報をカプセル化する。
	Instance instance = getNacosInstanceFromRegistration(registration);
	try {
	 //namingServiceの登録メソッドを呼び出す
	 //前述のように、namingServiceはリフレクションによって作成され、そこにはいくつかの初期化情報がある。
	 //例えば、ハートビート送信用のスレッド・プールはinitで作成され、プロパティBeatReactorに格納される。...nacos
		namingService.registerInstance(serviceId, group, instance);
		log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
				instance.getIp(), instance.getPort());
	}
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
 //一時的なインスタンスかどうか。
 if (instance.isEphemeral()) {
 //ハートビートの送信に必要な情報をラップする 
 BeatInfo beatInfo = new BeatInfo();
 beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
 beatInfo.setIp(instance.getIp());
 beatInfo.setPort(instance.getPort());
 beatInfo.setCluster(instance.getClusterName());
 beatInfo.setWeight(instance.getWeight());
 beatInfo.setMetadata(instance.getMetadata());
 beatInfo.setScheduled(false);
 beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
 //ハートビートを送信するタスクを作成する。
 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
 }
 
 //メインライン登録コード
 serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

ハートビート・タスクの送信

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
	NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
	//サービス名がservice-ribbon-test01の場合、キーを取得する。
	//キーはDEFAULT_GROUP@@service-ribbon-test01#.109#7001
	String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
	BeatInfo existBeat = null;
	//fix #1733
	if ((existBeat = dom2Beat.remove(key)) != null) {
		existBeat.setStopped(true);
	}
	//dom2Beatを入れる......正直なところ、これが何をするものなのか知らないし、実際に見たこともない。
	dom2Beat.put(key, beatInfo);
	//Runnableを実装したタスクBeatTaskを作成する。 
	//デフォルトでは、このメソッドは5秒後に実行されるが、実行されるのは1回だけである。 ハートビートが常に送信される理由は、BeatTaskで処理される。
	executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
	MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
//BeatTask
class BeatTask implements Runnable {
 BeatInfo beatInfo;
 public BeatTask(BeatInfo beatInfo) {
 this.beatInfo = beatInfo;
 }
 @Override
 //このメソッドを見る
 public void run() {
 if (beatInfo.isStopped()) {
 return;
 }
 //インターバルを記録する
 long nextTime = beatInfo.getPeriod();
 try {
 //カプセル化されたHTTPリクエストであるハートビートの送信方法。
 JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
 long interval = result.getIntValue("clientBeatInterval");
 boolean lightBeatEnabled = false;
 if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
 lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
 }
 BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
 if (interval > 0) {
 nextTime = interval;
 }
 int code = NamingResponseCode.OK;
 if (result.containsKey(CommonParams.CODE)) {
 code = result.getIntValue(CommonParams.CODE);
 }
 //この場合、再度登録する。
 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
 Instance instance = new Instance();
 instance.setPort(beatInfo.getPort());
 instance.setIp(beatInfo.getIp());
 instance.setWeight(beatInfo.getWeight());
 instance.setMetadata(beatInfo.getMetadata());
 instance.setClusterName(beatInfo.getCluster());
 instance.setServiceName(beatInfo.getServiceName());
 instance.setInstanceId(instance.getInstanceId());
 instance.setEphemeral(true);
 try {
 serverProxy.registerService(beatInfo.getServiceName(),
 NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
 } catch (Exception ignore) {
 }
 }
 } catch (NacosException ne) {
 NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
 JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
 }
 //ハートビート送信後、一定時間待って再度beattaskタスクを作成する。
 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
 }
}

登録

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
 NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
 namespaceId, serviceName, instance);
 //リクエスト情報をカプセル化する
 final Map<String, String> params = new HashMap<String, String>(9);
 params.put(CommonParams.NAMESPACE_ID, namespaceId);
 params.put(CommonParams.SERVICE_NAME, serviceName);
 params.put(CommonParams.GROUP_NAME, groupName);
 params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
 params.put("ip", instance.getIp());
 params.put("port", String.valueOf(instance.getPort()));
 params.put("weight", String.valueOf(instance.getWeight()));
 params.put("enable", String.valueOf(instance.isEnabled()));
 params.put("healthy", String.valueOf(instance.isHealthy()));
 params.put("ephemeral", String.valueOf(instance.isEphemeral()));
 params.put("metadata", JSON.toJSONString(instance.getMetadata()));
 
 //このメソッドを見れば、ほぼこれだと推測できる。
 reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
//5つのパラメータ
//api リクエストアドレス /nacos/v1/ns/instance
//params リクエスト・パラメーター
//body
//servers nacos これは、nacosクラスタ・モデルのためのアドレス集である。
//method リクエストタイプ ポスト
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
 params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
 //nacosサーバーのリストが存在するかどうかを判断する。
 if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
 throw new NacosException(NacosException.INVALID_PARAM, "no server available");
 }
 
 NacosException exception = new NacosException();
 if (servers != null && !servers.isEmpty()) {
 //登録したいnacosサービスのアドレスを取得する。 
 //ルールはランダム化される
 Random random = new Random(System.currentTimeMillis());
 int index = random.nextInt(servers.size());
 for (int i = 0; i < servers.size(); i++) {
 String server = servers.get(index);
 try {
 // 
 return callServer(api, params, body, server, method);
 } catch (NacosException e) {
 exception = e;
 if (NAMING_LOGGER.isDebugEnabled()) {
 NAMING_LOGGER.debug("request {} failed.", server, e);
 }
 }
 index = (index + 1) % servers.size();
 }
 }
 
 
 ............................................................................
}
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
 throws NacosException {
 //時間を記録する
 long start = System.currentTimeMillis();
 long end = 0;
 injectSecurityInfo(params);
 List<String> headers = builderHeaders();
 String url;
 if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
 //アドレスを要求する
 url = curServer + api;
 } else {
 if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
 curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
 }
 //リクエストアドレスのスプライシング
 //httpまたはhttpsの接頭辞がない場合は
 url = HttpClient.getPrefix() + curServer + api;
 }
 
 //httpリクエストの開始 登録の終了
 HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
 end = System.currentTimeMillis();
 MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
 .observe(end - start);
 if (HttpURLConnection.HTTP_OK == result.code) {
 return result.content;
 }
 if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
 return StringUtils.EMPTY;
 }
 throw new NacosException(result.code, result.content);
}

ナコスサーバー

//nacosクライアントはすでに/nacos/v1/ns/instanceに登録されていることが分かっている。
//コード・モジュールはnacos-namingである。:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
 //サービス名を取得する 
 final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
 final String namespaceId = WebUtils
 .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
 //リクエスト・パラメーターを解析する
 final Instance instance = parseInstance(request);
 //メインライン・ロジック登録
 serviceManager.registerInstance(namespaceId, serviceName, instance);
 return "ok";
}
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
 
 //インスタンスが格納されているコンテナを取得するには、このメソッドを参照する必要がある。
 createEmptyService(namespaceId, serviceName, instance.isEphemeral());
 
 //サービスを取得する
 Service service = getService(namespaceId, serviceName);
 
 if (service == null) {
 throw new NacosException(NacosException.INVALID_PARAM,
 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
 }
 //サービスの登録、つまりサービスの追加はデータの追加である。 
 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

サービス・コンテナの作成

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
 createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
 throws NacosException {
 // 以下は主なメソッドである。
 Service service = getService(namespaceId, serviceName);
 if (service == null) {
 //key public 一般に、キーは使用時に公開される。
 //空の場合は、次のようにコンテナを構築する。
 //コンテナ構造は記事冒頭の図にある。
 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
 service = new Service();
 service.setName(serviceName);
 service.setNamespaceId(namespaceId);
 service.setGroupName(NamingUtils.getGroupName(serviceName));
 // now validate the service. if failed, exception will be thrown
 //これは最終更新時刻を設定するもので、通常はハートビート時刻となる。
 service.setLastModifiedMillis(System.currentTimeMillis());
 service.recalculateChecksum();
 if (cluster != null) {
 cluster.setService(service);
 service.getClusterMap().put(cluster.getName(), cluster);
 }
 service.validate();
 
 putServiceAndInit(service);
 if (!local) {
 addOrReplaceService(service);
 }
 }
}
//作成したサービスをマップコンテナに入れる
private void putServiceAndInit(Service service) throws NacosException {
 //これはpublic namespaceIdをキーとするマップに入れられる。
 putService(service);
 //初期化 これは実はかなり重要だ。
 service.init();
 
 //以下の2行は、作成したコンテナをそれぞれephemeralConsistencyServiceとpersistentConsistencyServiceのlisteners属性に入れるためのコードである。
 //listeners 属性はMapのような構造になっている<String, ConcurrentLinkedQueue<RecordListener>> listeners
 //key  
 //com.alibaba.nacos.naming.iplist.ephemeral+NamespaceId##gorup@@serviceName
 //com.alibaba.nacos.naming.iplist.+NamespaceId##gorup@@serviceName
 //これらの実装クラスはDistroConsistencyServiceImpl RaftConsistencyServiceImplである。
 consistencyService
 .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
 consistencyService
 .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
 Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
 public void init() {
 //インスタンスの健全性をチェックするタスクを作成する
 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
 for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
 entry.getValue().setService(this);
 entry.getValue().init();
 }
}
//
public Service getService(String namespaceId, String serviceName) {
 // serviceMap データ構造Map<String, Map<String, Service>> serviceMap 
 // namespaceId 一般向け
 //最初に登録されたサービスはNULLでなければならない。
 if (serviceMap.get(namespaceId) == null) {
 return null;
 }
 return chooseServiceMap(namespaceId).get(serviceName);
}

インスタンスのヘルスチェック

//これは上記のinitメソッドに従う
public void init() {
 //インスタンスが健全かどうかをチェックするタスクを作成する 
 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
 for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
 entry.getValue().setService(this);
 entry.getValue().init();
 }
}
public static void scheduleCheck(ClientBeatCheckTask task) {
 ///key value
 futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
//このタスクのJavaドキュメントから始めよう。 
//名前からわかるように、これはクライアントのハートビート・チェックであり、HealthCheckTaskというハートビート・チェック・タスクがある。
//Check and update statues of ephemeral instances, remove them if they have been expired.
//何をやっているのか、Youtubeから直接大まかなイメージをつかむことができる。:
//一時的なインスタンスのステータスをチェックして更新し、期限切れの場合は削除する
//実行メソッドを見る
@Override
public void run() {
 try {
 if (!getDistroMapper().responsible(service.getName())) {
 return;
 }
 
 if (!getSwitchDomain().isHealthCheckEnabled()) {
 return;
 }
 
 //ここですべてのインスタンスを取得する
 //ブーリアン・パラメータを追加した理由は、インスタンスには一時的なものと一時的でないものの2種類があり、それぞれ異なるコンテナに配置されるからである。 
 //ephemeralInstances永続的インスタンス
 List<Instance> instances = service.allIPs(true);
 
 // first set health status of instances:
 //フェッチされたインスタンスをループする。
 for (Instance instance : instances) {
 //現在時刻から最後のインスタンス更新時刻を引いた値が、デフォルトのタイムアウト(15秒)より大きいかどうかを判断する。
 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
 if (!instance.isMarked()) {
 //まだ正常な場合
 if (instance.isHealthy()) {
 //不健康に設定する
 instance.setHealthy(false);
 //ログを印刷する
 Loggers.EVT_LOG
 .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
 instance.getIp(), instance.getPort(), instance.getClusterName(),
 service.getName(), UtilsAndCommons.LOCALHOST_SITE,
 instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
 getPushService().serviceChanged(service);
 //イベントを投稿する
 ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
 }
 }
 }
 }
 
 if (!getGlobalConfig().isExpireInstance()) {
 return;
 }
 
 // then remove obsolete instances:
 //この場合のアクションは、サービスを削除することである。 
 for (Instance instance : instances) {
 
 if (instance.isMarked()) {
 continue;
 }
 //現在の時刻から最後のインスタンス更新を引いた値が、デフォルトのタイムアウト(30秒)より大きいかどうかを判断する。
 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
 // delete instance
 Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
 JacksonUtils.toJson(instance));
 //インスタンスを削除する。これは、nacosクラスタ内の他のnacosサービスに通知するためのラップされたhttpリクエストである。 
 deleteIp(instance);
 }
 }
 
 } catch (Exception e) {
 Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
 }
 
}

メインラインの登録

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
 
 //インスタンスが格納されているコンテナを取得するには、このメソッドを参照する必要がある。
 createEmptyService(namespaceId, serviceName, instance.isEphemeral());
 
 //サービスを取得する
 Service service = getService(namespaceId, serviceName);
 
 if (service == null) {
 throw new NacosException(NacosException.INVALID_PARAM,
 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
 }
 //サービスの登録、つまりサービスの追加はデータの追加である。 
 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
 throws NacosException {
 
 String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
 
 //サービスを再取得する
 Service service = getService(namespaceId, serviceName);
 
 // 
 synchronized (service) {
 //この方法は、最初に見ておくことが重要である
 //このメソッドは、現在のサービス名のすべてのインスタンスを取得する。
 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
 
 Instances instances = new Instances();
 instances.setInstanceList(instanceList);
 
 consistencyService.put(key, instances);
 }
}

データのコピー

/**
 *この方法は、オリジナルのdocで説明されているように、インスタンスの最新のセットを比較し、フェッチする。
 *
 * これは自分でも理解しなければならない:
 * このサービスのすべてのインスタンスを最初に取得する
 * このリクエストのインスタンスに加えて
 * このデータをコピーバックする
 */
 public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
 throws NacosException {
 //最初の登録は空である
 	//com.alibaba.nacos.naming.iplist.ephemeral+NamespaceId##gorup@@serviceName
 //com.alibaba.nacos.naming.iplist.+NamespaceId##gorup@@serviceName
 //このデータがconsistencyServiceのどこに格納されるかは後で説明する。
 Datum datum = consistencyService
 .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
 
 //最初の登録は空である
 List<Instance> currentIPs = service.allIPs(ephemeral);
 //主にメモリ節約のため、currentIPsの長さでマップを作成する。
 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
 //このセットはインスタンスIDを保存するために使用される。
 Set<String> currentInstanceIds = Sets.newHashSet();
 
 //インスタンスが存在する場合、ループする
 for (Instance instance : currentIPs) {
 	//キーIPをスプライシングする:port value インスタンス・オブジェクト
 currentInstances.put(instance.toIpAddr(), instance);
 //currentInstanceIdsに入れる
 currentInstanceIds.add(instance.getInstanceId());
 }
 
 //これは基本的に、返さなければならないデータのセットである。
 Map<String, Instance> instanceMap;
 
 //このデータにデータが存在するかどうかを判断する
 if (datum != null) {
 	//データがあれば、instanceMapにコピーをとる。
 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
 } else {
 	//マップが存在しない場合は、空のマップを作成する。
 instanceMap = new HashMap<>(ips.length);
 }
 //このリクエストのインスタンスをループする
 for (Instance instance : ips) {
 	//サービス.ClusterMapインスタンスが.getClusterName()key((このサービスは登録時に作成されるか、getServerによって取得される)
 	//ClusterMap は、サービスがインスタンス化されたときに作成される。
 if (!service.getClusterMap().containsKey(instance.getClusterName())) {
 	//存在しない場合は、キーをクラスタ名としてクラスタを作成する。
 Cluster cluster = new Cluster(instance.getClusterName(), service);
 //この初期化の中にタスクがある。
 cluster.init();
 //クラスタのサービスを開始する.ClusterMap 
 service.getClusterMap().put(instance.getClusterName(), cluster);
 Loggers.SRV_LOG
 .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
 instance.getClusterName(), instance.toJson());
 }
 //インスタンスが削除されるかどうかはここで判断する
 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
 	//インスタンスを削除する インスタンスマップに存在することを示す、以前存在したインスタンス説明を削除する
 instanceMap.remove(instance.getDatumKey());
 } else {
 	//インスタンスマップに追加する
 instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
 instanceMap.put(instance.getDatumKey(), instance);
 }
 
 }
 //この時に登録したデータが空のままだとエラーになる!
 if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
 throw new IllegalArgumentException(
 "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
 .toJson(instanceMap.values()));
 }
 //リターン例コレクション
 return new ArrayList<>(instanceMap.values());
 }

メインラインの登録


public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
 throws NacosException {
 
 String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
 
 //サービスを再取得する
 Service service = getService(namespaceId, serviceName);
 
 // 
 synchronized (service) {
 //この方法は、最初に見ておくことが重要である
 //このメソッドは、現在のサービス名のすべてのインスタンスを取得する。
 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
 
 //この2行のコードは、フェッチ・インスタンス・コレクションをコピーしているだけだ。 なぜコピーする必要があるのかわからない。
 Instances instances = new Instances();
 instances.setInstanceList(instanceList);
 
 //メインライン
 consistencyService.put(key, instances);
 }
}
@Override
public void put(String key, Record value) throws NacosException {
 //mapConsistencyServiceは、一時的なインスタンスかどうかによって異なるタイプを取得する。 
 //これはDistroConsistencyServiceImplの一時的なインスタンスである。
 mapConsistencyService(key).put(key, value);
}
//この方法は重要である
public void put(String key, Record value) throws NacosException {
 //データをメモリにフラッシュする
 onPut(key, value);
 //nacosクラスタ内の他のnacosサービスに登録データを同期する
 taskDispatcher.addTask(key);
}

登録データをメモリにリフレッシュ


public void onPut(String key, Record value) {
			//この値はインスタンスの集まりである
 if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
 Datum<Instances> datum = new Datum<>();
 datum.value = (Instances) value;
 datum.key = key;
 datum.timestamp.incrementAndGet();
 //インスタンスのコレクションをdataStoreにアセンブルする
 dataStore.put(key, datum);
 }
			
 if (!listeners.containsKey(key)) {
 return;
 }
		//notifierは、実行可能な 
 //この方法は、現在の時刻をノーティファイアのキュー・プロパティに入れることで機能する。
 //次に、このタスクを実行し、イベントを処理するためにキューからイベントを取り出す。
 notifier.addTask(key, ApplyAction.CHANGE);
}
public void addTask(String datumKey, ApplyAction action) {
 //このキーが存在する場合は、次のタスクが処理されるときに削除されるので、そのまま終了する。
 if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
 return;
 }
 if (action == ApplyAction.CHANGE) {
 	//このイベントの名前を保存する
 services.put(datumKey, StringUtils.EMPTY);
 }
 //キューに入れる
 tasks.offer(Pair.with(datumKey, action));
}
//主に見るべきは、ノティファイア・タスクの実行メソッドとその内容である。
public void run() {
 Loggers.DISTRO.info("distro notifier started");
 
 //ここにはデッド・ループがある
 for (; ; ) {
 try {
 	//このキューからデータを取り続ける
 //このデータは、上記のように投入されたイベントである。
 Pair<String, ApplyAction> pair = tasks.take();
 // 
 handle(pair);
 } catch (Throwable e) {
 Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
 }
 }
}
private void handle(Pair<String, ApplyAction> pair) {
 try {
 String datumKey = pair.getValue0();
 ApplyAction action = pair.getValue1();
 //サービスのキーを削除する
 services.remove(datumKey);
 
 int count = 0;
 
 if (!listeners.containsKey(datumKey)) {
 return;
 }
 //このリスナーにデータを入れるプロセスは、上記の
 for (RecordListener listener : listeners.get(datumKey)) {
 
 count++;
 
 try {
 	//を追加する場合
 if (action == ApplyAction.CHANGE) {
 	//登録を処理する
 listener.onChange(datumKey, dataStore.get(datumKey).value);
 continue;
 }
 //登録データを削除する場合
 if (action == ApplyAction.DELETE) {
 listener.onDelete(datumKey);
 continue;
 }
 } catch (Throwable e) {
 Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
 }
 }
 
 if (Loggers.DISTRO.isDebugEnabled()) {
 Loggers.DISTRO
 .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
 datumKey, count, action.name());
 }
 } catch (Throwable e) {
 Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
 }
 }
 
 
 @Override
 public void onChange(String key, Instances value) throws Exception {
 
 Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
 
 for (Instance instance : value.getInstanceList()) {
 
 if (instance == null) {
 // Reject this abnormal instance list:
 throw new RuntimeException("got null instance " + key);
 }
 
 if (instance.getWeight() > 10000.0D) {
 instance.setWeight(10000.0D);
 }
 
 if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
 instance.setWeight(0.01D);
 }
 }
 // 
 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
 
 recalculateChecksum();
 }
 
 
 public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
 Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
 for (String clusterName : clusterMap.keySet()) {
 ipMap.put(clusterName, new ArrayList<>());
 }
 
 for (Instance instance : instances) {
 try {
 if (instance == null) {
 Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
 continue;
 }
 
 if (StringUtils.isEmpty(instance.getClusterName())) {
 instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
 }
 
 if (!clusterMap.containsKey(instance.getClusterName())) {
 Loggers.SRV_LOG
 .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
 instance.getClusterName(), instance.toJson());
 Cluster cluster = new Cluster(instance.getClusterName(), this);
 cluster.init();
 getClusterMap().put(instance.getClusterName(), cluster);
 }
 
 List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
 if (clusterIPs == null) {
 clusterIPs = new LinkedList<>();
 ipMap.put(instance.getClusterName(), clusterIPs);
 }
 
 clusterIPs.add(instance);
 } catch (Exception e) {
 Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
 }
 }
 
 for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
 //make every ip mine
 //ここで新しいインスタンスをclusterMapに入れる。
 //updateIps このメソッドには他にもいくつかコードがあるので、ここでは割愛する。 
 List<Instance> entryIPs = entry.getValue();
 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
 }
 
 setLastModifiedMillis(System.currentTimeMillis());
 
 // 
 getPushService().serviceChanged(this);
 StringBuilder stringBuilder = new StringBuilder();
 
 for (Instance instance : allIPs()) {
 stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
 }
 
 Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
 stringBuilder.toString());
 
 }

nacosクラスタ内の他のnacosサービスへの登録データの同期

//com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.put(String, Record)
public void put(String key, Record value) throws NacosException {
 onPut(key, value);
 //nacosクラスタ内の他のnacosサービスに同期する
 //コードを見るな
 taskDispatcher.addTask(key);
}
Read next

var、let、constの違い

Varの概要:1、変数のvarの定義は、全体の閉じた関数です 2、変数の昇格に関係なく、最初の数行内の関数内の変数のvarの定義は、スコープの先頭に昇格されます 3、変数のvarの定義は、変数の定義を繰り返すように変更することができ、変数の次の定義は、上記の定義された変数によってカバーされます。

May 30, 2020 · 2 min read