私にスプリングのソースコードを読めと言うのは無駄なことです!
説明
大まかなフローチャート
1. nacos-configについては後述する。
springnacosの統合
上図は、プロセス全体を3つの部分に大別したものである。
1. springnacosを統合する
2. nacosクライアント側の処理
3. nacosサーバー側の処理
nacos
1. サードパーティコンポーネントの一般的な春の統合では、少なくとも2つのパッケージが存在する。1つはspring-boot/cloud-XXX-starter用、もう1つはサービスコアパッケージ用で、つまりスタートアップパッケージとコアパッケージである。
2. springnacosサービスとディスカバリーを統合するためのスターターパッケージはspring-cloud-Alibaba-nacoc-discoveryである。
3. このパッケージが行うのは、現在のジャーのスプリングを登録することである。.factoriesクラスをspringiocコンテナに登録する。<spring-boot-XXX-starterとその原則を手書きする>
//すべてはNacosServiceRegistryAutoConfigurationから始まる。
//NacosServiceRegistryAutoConfigurationspring-cloud-Alibaba-nacoc-discoveryにある。.factoriesnacosクラスタに登録されている
//com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration.nacosServiceRegistry(NacosDiscoveryProperties)
//nacosServiceRegistryを作成する
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
nacosServiceRegistry含まれるプロパティ
NacosDiscoveryProperties
NamingService
//このメソッドは、nacosServiceRegistryの2つのプロパティにそれぞれ値を代入しているだけのように見える。
//NacosDiscoveryPropertiesこれはコンフィギュレーション・パラメータ・クラスである。@Bean
//主なクラスはNamingServiceで、これはnacosのコンポーネント・クラスである。
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
public NamingService namingServiceInstance() {
//ダイレクト・リターンがある場合
if (null != namingService) {
return namingService;
}
try {
//お持ちでない場合は作成する
namingService = NacosFactory.createNamingService(getNacosProperties());
}
catch (Exception e) {
log.error("create naming service error!properties={},e=,", this, e);
return null;
}
return namingService;
}
//getNacosPropertiesこの方法では、サービスに関するいくつかの情報を得る。
//
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
//
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
//リフレクションによって作成されたことに気づくだろう。
//clssオブジェクトを取得する
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
//パラメトリック・コンストラクタを呼び出す
NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
//com.alibaba.nacos.client.naming.NacosNamingService.NacosNamingService(Properties)
public NacosNamingService(Properties properties) {
init(properties);
}
//初期化
private void init(Properties properties) {
//名前空間パブリック
//以下のコード行は、その名の通り初期化作業を行うものだが、私はあまり見ていない。
namespace = InitUtils.initNamespaceForNaming(properties);
initServerAddr(properties);
InitUtils.initWebRootContext();
initCacheDir();
initLogName(properties);
eventDispatcher = new EventDispatcher();
//この中には、nacosサービス自体に特有と思われる時限タスクが2つあるが、私は見ていない。
serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
//ハートビートという名前から推測できるように、ハートビートを送信するタスクを処理する時間指定スレッドのプールがある。
//initClientBeatThreadCountこのメソッドは、マシンに基づいてスレッド数を選択する。
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
//これはサービスの変更を処理するタスクである。
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),
initPollingThreadCount(properties));
}
nacos
//nacosクライアントの起動元はスプリングリスナーである
//NacosAutoServiceRegistrationはNacosServiceRegistryAutoConfigurationにある。@bean
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
NacosAutoServiceRegistration(nacosを実装している。
ApplicationListener<WebServerInitializedEvent> ウェブの初期化イベントをリッスンする
//リスナーの原理についてはここでは触れないが、スプリング・リスナーの原理についてはいつか別の記事を書くつもりだ。
//ここで知っておくべきことは、リスニング・イベントがトリガーされると、実装クラスのonApplicationEventメソッドが呼び出されるということだ。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
//イベント実行エントリーをリスニングする
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
//
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
//登録を開始する
//このコードもすべてspring-cloudで実行され、いくつかの共通インターフェースを提供する。
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
@Override
//現在配置されているNacosAutoServiceRegistrationクラス
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
//親クラスのregisterメソッドを呼び出す
super.register();
}
//現在位置 AbstractAutoServiceRegistration
protected void register() {
//serviceRegistryプロパティのメソッドを呼び出す
//上記のコードは、全体的に仕事をするようになる:
//実際の登録コードは、AbstractAutoServiceRegistrationのserviceRegistryプロパティのregisterから始まる。
//nacosNacosServiceRegistryクラスはServiceRegistryインターフェイスを実装している。
//NacosAutoServiceRegistrationAbstractAutoServiceRegistration を継承する。
//現在のNacosAutoServiceRegistrationクラスの実装は、プロパティserviceRegistryを持ち、これはnacosが提供するNacos Service Registryによって実装されている。
//全体的な考え方は、スプリングが共通のコードとインターフェースのセットを提供し、サードパーティーのコンポーネントがそれらを実装するというものだ。
this.serviceRegistry.register(getRegistration());
}
//nacos開始するコード
//現在のクラスは、前述の春のnacos統合で使用されたものである。 @bean 登録済み
@Override
public void register(Registration registration) {
//サービス名であるserviceIdを取得する
//マイクロサービス用にspringをセットアップしていない場合は.application.name サービスがnacosに登録されていない
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
//ip、ポートなどのインスタンス情報をカプセル化する。
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//namingServiceの登録メソッドを呼び出す
//前述のように、namingServiceはリフレクションによって作成され、そこにはいくつかの初期化情報がある。
//例えば、ハートビート送信用のスレッド・プールはinitで作成され、プロパティBeatReactorに格納される。...nacos
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//一時的なインスタンスかどうか。
if (instance.isEphemeral()) {
//ハートビートの送信に必要な情報をラップする
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//ハートビートを送信するタスクを作成する。
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//メインライン登録コード
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
ハートビート・タスクの送信
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
//サービス名がservice-ribbon-test01の場合、キーを取得する。
//キーはDEFAULT_GROUP@@service-ribbon-test01#.109#7001
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
//dom2Beatを入れる......正直なところ、これが何をするものなのか知らないし、実際に見たこともない。
dom2Beat.put(key, beatInfo);
//Runnableを実装したタスクBeatTaskを作成する。
//デフォルトでは、このメソッドは5秒後に実行されるが、実行されるのは1回だけである。 ハートビートが常に送信される理由は、BeatTaskで処理される。
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
//BeatTask
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
//このメソッドを見る
public void run() {
if (beatInfo.isStopped()) {
return;
}
//インターバルを記録する
long nextTime = beatInfo.getPeriod();
try {
//カプセル化されたHTTPリクエストであるハートビートの送信方法。
JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.getIntValue("clientBeatInterval");
boolean lightBeatEnabled = false;
if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.containsKey(CommonParams.CODE)) {
code = result.getIntValue(CommonParams.CODE);
}
//この場合、再度登録する。
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ne) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
}
//ハートビート送信後、一定時間待って再度beattaskタスクを作成する。
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
登録
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
//リクエスト情報をカプセル化する
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
//このメソッドを見れば、ほぼこれだと推測できる。
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
//5つのパラメータ
//api リクエストアドレス /nacos/v1/ns/instance
//params リクエスト・パラメーター
//body
//servers nacos これは、nacosクラスタ・モデルのためのアドレス集である。
//method リクエストタイプ ポスト
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
//nacosサーバーのリストが存在するかどうかを判断する。
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
//登録したいnacosサービスのアドレスを取得する。
//ルールはランダム化される
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
//
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
............................................................................
}
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
throws NacosException {
//時間を記録する
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
List<String> headers = builderHeaders();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
//アドレスを要求する
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
//リクエストアドレスのスプライシング
//httpまたはhttpsの接頭辞がない場合は
url = HttpClient.getPrefix() + curServer + api;
}
//httpリクエストの開始 登録の終了
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(result.code, result.content);
}
ナコスサーバー
//nacosクライアントはすでに/nacos/v1/ns/instanceに登録されていることが分かっている。
//コード・モジュールはnacos-namingである。:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//サービス名を取得する
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//リクエスト・パラメーターを解析する
final Instance instance = parseInstance(request);
//メインライン・ロジック登録
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//インスタンスが格納されているコンテナを取得するには、このメソッドを参照する必要がある。
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//サービスを取得する
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//サービスの登録、つまりサービスの追加はデータの追加である。
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
サービス・コンテナの作成
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 以下は主なメソッドである。
Service service = getService(namespaceId, serviceName);
if (service == null) {
//key public 一般に、キーは使用時に公開される。
//空の場合は、次のようにコンテナを構築する。
//コンテナ構造は記事冒頭の図にある。
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
//これは最終更新時刻を設定するもので、通常はハートビート時刻となる。
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
//作成したサービスをマップコンテナに入れる
private void putServiceAndInit(Service service) throws NacosException {
//これはpublic namespaceIdをキーとするマップに入れられる。
putService(service);
//初期化 これは実はかなり重要だ。
service.init();
//以下の2行は、作成したコンテナをそれぞれephemeralConsistencyServiceとpersistentConsistencyServiceのlisteners属性に入れるためのコードである。
//listeners 属性はMapのような構造になっている<String, ConcurrentLinkedQueue<RecordListener>> listeners
//key
//com.alibaba.nacos.naming.iplist.ephemeral+NamespaceId##gorup@@serviceName
//com.alibaba.nacos.naming.iplist.+NamespaceId##gorup@@serviceName
//これらの実装クラスはDistroConsistencyServiceImpl RaftConsistencyServiceImplである。
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void init() {
//インスタンスの健全性をチェックするタスクを作成する
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
//
public Service getService(String namespaceId, String serviceName) {
// serviceMap データ構造Map<String, Map<String, Service>> serviceMap
// namespaceId 一般向け
//最初に登録されたサービスはNULLでなければならない。
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
インスタンスのヘルスチェック
//これは上記のinitメソッドに従う
public void init() {
//インスタンスが健全かどうかをチェックするタスクを作成する
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
public static void scheduleCheck(ClientBeatCheckTask task) {
///key value
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
//このタスクのJavaドキュメントから始めよう。
//名前からわかるように、これはクライアントのハートビート・チェックであり、HealthCheckTaskというハートビート・チェック・タスクがある。
//Check and update statues of ephemeral instances, remove them if they have been expired.
//何をやっているのか、Youtubeから直接大まかなイメージをつかむことができる。:
//一時的なインスタンスのステータスをチェックして更新し、期限切れの場合は削除する
//実行メソッドを見る
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
//ここですべてのインスタンスを取得する
//ブーリアン・パラメータを追加した理由は、インスタンスには一時的なものと一時的でないものの2種類があり、それぞれ異なるコンテナに配置されるからである。
//ephemeralInstances永続的インスタンス
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
//フェッチされたインスタンスをループする。
for (Instance instance : instances) {
//現在時刻から最後のインスタンス更新時刻を引いた値が、デフォルトのタイムアウト(15秒)より大きいかどうかを判断する。
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
//まだ正常な場合
if (instance.isHealthy()) {
//不健康に設定する
instance.setHealthy(false);
//ログを印刷する
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
//イベントを投稿する
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
//この場合のアクションは、サービスを削除することである。
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//現在の時刻から最後のインスタンス更新を引いた値が、デフォルトのタイムアウト(30秒)より大きいかどうかを判断する。
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
//インスタンスを削除する。これは、nacosクラスタ内の他のnacosサービスに通知するためのラップされたhttpリクエストである。
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
メインラインの登録
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//インスタンスが格納されているコンテナを取得するには、このメソッドを参照する必要がある。
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//サービスを取得する
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//サービスの登録、つまりサービスの追加はデータの追加である。
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//サービスを再取得する
Service service = getService(namespaceId, serviceName);
//
synchronized (service) {
//この方法は、最初に見ておくことが重要である
//このメソッドは、現在のサービス名のすべてのインスタンスを取得する。
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
データのコピー
/**
*この方法は、オリジナルのdocで説明されているように、インスタンスの最新のセットを比較し、フェッチする。
*
* これは自分でも理解しなければならない:
* このサービスのすべてのインスタンスを最初に取得する
* このリクエストのインスタンスに加えて
* このデータをコピーバックする
*/
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
//最初の登録は空である
//com.alibaba.nacos.naming.iplist.ephemeral+NamespaceId##gorup@@serviceName
//com.alibaba.nacos.naming.iplist.+NamespaceId##gorup@@serviceName
//このデータがconsistencyServiceのどこに格納されるかは後で説明する。
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
//最初の登録は空である
List<Instance> currentIPs = service.allIPs(ephemeral);
//主にメモリ節約のため、currentIPsの長さでマップを作成する。
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
//このセットはインスタンスIDを保存するために使用される。
Set<String> currentInstanceIds = Sets.newHashSet();
//インスタンスが存在する場合、ループする
for (Instance instance : currentIPs) {
//キーIPをスプライシングする:port value インスタンス・オブジェクト
currentInstances.put(instance.toIpAddr(), instance);
//currentInstanceIdsに入れる
currentInstanceIds.add(instance.getInstanceId());
}
//これは基本的に、返さなければならないデータのセットである。
Map<String, Instance> instanceMap;
//このデータにデータが存在するかどうかを判断する
if (datum != null) {
//データがあれば、instanceMapにコピーをとる。
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
//マップが存在しない場合は、空のマップを作成する。
instanceMap = new HashMap<>(ips.length);
}
//このリクエストのインスタンスをループする
for (Instance instance : ips) {
//サービス.ClusterMapインスタンスが.getClusterName()key((このサービスは登録時に作成されるか、getServerによって取得される)
//ClusterMap は、サービスがインスタンス化されたときに作成される。
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
//存在しない場合は、キーをクラスタ名としてクラスタを作成する。
Cluster cluster = new Cluster(instance.getClusterName(), service);
//この初期化の中にタスクがある。
cluster.init();
//クラスタのサービスを開始する.ClusterMap
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
//インスタンスが削除されるかどうかはここで判断する
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
//インスタンスを削除する インスタンスマップに存在することを示す、以前存在したインスタンス説明を削除する
instanceMap.remove(instance.getDatumKey());
} else {
//インスタンスマップに追加する
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
instanceMap.put(instance.getDatumKey(), instance);
}
}
//この時に登録したデータが空のままだとエラーになる!
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
//リターン例コレクション
return new ArrayList<>(instanceMap.values());
}
メインラインの登録
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//サービスを再取得する
Service service = getService(namespaceId, serviceName);
//
synchronized (service) {
//この方法は、最初に見ておくことが重要である
//このメソッドは、現在のサービス名のすべてのインスタンスを取得する。
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//この2行のコードは、フェッチ・インスタンス・コレクションをコピーしているだけだ。 なぜコピーする必要があるのかわからない。
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//メインライン
consistencyService.put(key, instances);
}
}
@Override
public void put(String key, Record value) throws NacosException {
//mapConsistencyServiceは、一時的なインスタンスかどうかによって異なるタイプを取得する。
//これはDistroConsistencyServiceImplの一時的なインスタンスである。
mapConsistencyService(key).put(key, value);
}
//この方法は重要である
public void put(String key, Record value) throws NacosException {
//データをメモリにフラッシュする
onPut(key, value);
//nacosクラスタ内の他のnacosサービスに登録データを同期する
taskDispatcher.addTask(key);
}
登録データをメモリにリフレッシュ
public void onPut(String key, Record value) {
//この値はインスタンスの集まりである
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//インスタンスのコレクションをdataStoreにアセンブルする
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//notifierは、実行可能な
//この方法は、現在の時刻をノーティファイアのキュー・プロパティに入れることで機能する。
//次に、このタスクを実行し、イベントを処理するためにキューからイベントを取り出す。
notifier.addTask(key, ApplyAction.CHANGE);
}
public void addTask(String datumKey, ApplyAction action) {
//このキーが存在する場合は、次のタスクが処理されるときに削除されるので、そのまま終了する。
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
//このイベントの名前を保存する
services.put(datumKey, StringUtils.EMPTY);
}
//キューに入れる
tasks.offer(Pair.with(datumKey, action));
}
//主に見るべきは、ノティファイア・タスクの実行メソッドとその内容である。
public void run() {
Loggers.DISTRO.info("distro notifier started");
//ここにはデッド・ループがある
for (; ; ) {
try {
//このキューからデータを取り続ける
//このデータは、上記のように投入されたイベントである。
Pair<String, ApplyAction> pair = tasks.take();
//
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, ApplyAction> pair) {
try {
String datumKey = pair.getValue0();
ApplyAction action = pair.getValue1();
//サービスのキーを削除する
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
//このリスナーにデータを入れるプロセスは、上記の
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//を追加する場合
if (action == ApplyAction.CHANGE) {
//登録を処理する
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
//登録データを削除する場合
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
//ここで新しいインスタンスをclusterMapに入れる。
//updateIps このメソッドには他にもいくつかコードがあるので、ここでは割愛する。
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
//
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
nacosクラスタ内の他のnacosサービスへの登録データの同期
//com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.put(String, Record)
public void put(String key, Record value) throws NacosException {
onPut(key, value);
//nacosクラスタ内の他のnacosサービスに同期する
//コードを見るな
taskDispatcher.addTask(key);
}





