blog

dubbo-goのfailbackClusterについて話す。

この記事の一連の流れは、ダボゴーの-go-goの研究に焦点を当てています。...

Jan 30, 2021 · 5 min. read
シェア

シーケンス

この記事ではdubbo-goのfailbackClusterを取り上げます。

failbackCluster

type failbackCluster struct{}
const failback = "failback"
func init() {
	extension.SetCluster(failback, NewFailbackCluster)
}
// NewFailbackCluster ...
func NewFailbackCluster() cluster.Cluster {
	return &failbackCluster{}
}
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
	return newFailbackClusterInvoker(directory)
}
  • failbackClusterinvoker.taskListのjoinメソッドは newFailbackClusterInvokerを実行します。

newFailbackClusterInvoker

type failbackClusterInvoker struct {
	baseClusterInvoker
	once sync.Once
	ticker *time.Ticker
	maxRetries int64
	failbackTasks int64
	taskList *queue.Queue
}
func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
	invoker := &failbackClusterInvoker{
		baseClusterInvoker: newBaseClusterInvoker(directory),
	}
	retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
	retries, err := strconv.Atoi(retriesConfig)
	if err != nil || retries < 0 {
		logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
		retries = constant.DEFAULT_FAILBACK_TIMES_INT
	}
	failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
	if failbackTasksConfig <= 0 {
		failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
	}
	invoker.maxRetries = int64(retries)
	invoker.failbackTasks = failbackTasksConfig
	return invoker
}
  • newFailbackClusterInvoker メソッドは failbackClusterInvoker を作成し、maxRetries と failbackTasks プロパティを設定します。

Invoke

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
	invokers := invoker.directory.List(invocation)
	err := invoker.checkInvokers(invokers, invocation)
	if err != nil {
		logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.
",
			invocation.MethodName(), invoker.GetUrl().Service(), err)
		return &protocol.RPCResult{}
	}
	url := invokers[0].GetUrl()
	methodName := invocation.MethodName()
	//Get the service loadbalance config
	lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
	//Get the service method loadbalance config if have
	if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
		lb = v
	}
	loadbalance := extension.GetLoadbalance(lb)
	invoked := make([]protocol.Invoker, 0, len(invokers))
	var result protocol.Result
	ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
	//DO INVOKE
	result = ivk.Invoke(ctx, invocation)
	if result.Error() != nil {
		invoker.once.Do(func() {
			invoker.taskList = queue.New(invoker.failbackTasks)
			go invoker.process(ctx)
		})
		taskLen := invoker.taskList.Len()
		if taskLen >= invoker.failbackTasks {
			logger.Warnf("tasklist is too full > %d.
", taskLen)
			return &protocol.RPCResult{}
		}
		timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
		invoker.taskList.Put(timerTask)
		logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.
",
			methodName, url.Service(), result.Error().Error())
		// ignore
		return &protocol.RPCResult{}
	}
	return result
}

Destroy

func (invoker *failbackClusterInvoker) Destroy() {
	invoker.baseClusterInvoker.Destroy()
	// stop ticker
	if invoker.ticker != nil {
		invoker.ticker.Stop()
	}
	_ = invoker.taskList.Dispose()
}
  • Destroyメソッドに追加します。.baseClusterInvoker.Destroy()invoker.process(ctx).ticker.Stop()invoker.process(ctx).taskList.Dispose()

process

func (invoker *failbackClusterInvoker) process(ctx context.Context) {
	invoker.ticker = time.NewTicker(time.Second * 1)
	for range invoker.ticker.C {
		// check each timeout task and re-run
		for {
			value, err := invoker.taskList.Peek()
			if err == queue.ErrDisposed {
				return
			}
			if err == queue.ErrEmptyQueue {
				break
			}
			retryTask := value.(*retryTimerTask)
			if time.Since(retryTask.lastT).Seconds() < 5 {
				break
			}
			// ignore return. the get must success.
			_, err = invoker.taskList.Get(1)
			if err != nil {
				logger.Warnf("get task found err: %v
", err)
				break
			}
			go func(retryTask *retryTimerTask) {
				invoked := make([]protocol.Invoker, 0)
				invoked = append(invoked, retryTask.lastInvoker)
				retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
				var result protocol.Result
				result = retryInvoker.Invoke(ctx, retryTask.invocation)
				if result.Error() != nil {
					retryTask.lastInvoker = retryInvoker
					invoker.checkRetry(retryTask, result.Error())
				}
			}(retryTask)
		}
	}
}

checkRetry

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
	logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.
",
		retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
	retryTask.retries++
	retryTask.lastT = time.Now()
	if retryTask.retries > invoker.maxRetries {
		logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.
",
			retryTask.retries, retryTask.invocation)
	} else {
		invoker.taskList.Put(retryTask)
	}
}

まとめ

Read next

JavaScriptのキャッシュAPI

JavaScriptでCache APIを使ってリソースをキャッシュする方法を学びましょう。 Cache APIを使うと、サービスワーカーがキャッシュするリソースを制御できるようになります。Cache API を使用すると、サービスワーカーはオフラインで使用するリソースをキャッシュし、後で取得することができます。 Cache...

Jan 28, 2021 · 4 min read