背景
電流リミッターで一般的に使用されるアルゴリズムは、トークンバケットアルゴリズムです。これは簡単に言うと、トークンを保持するために使用できる一定の容量のバケツがあるということです。システムはトークンを一定の割合でバケツに入れ、トークンの数がバケツのサイズを超えると廃棄します。ユーザーはバケツからトークンを取り出し、トークンを得た場合のみ操作を実行できます。
基本構造
time/rateには2つの基本構造があります。 パラメータの意味をよく理解した上でコードを実行すると、より理解が深まりますので、まずは2つの基本構造から説明しましょう。
Limiter
type Limit float64
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
最初のパラメータは mu で、これはリミッター内のパラメータの更新を制御するために使用され、パラメータの更新操作は相互に排他的です。2番目のパラメータは limit で、これは基本的に float64 で、リミッタがトークンを生成するレートを表します。これはバケットが保持できるトークン数のサイズです。4番目のパラメータ tokens は、現在バケツに入っているトークンの数です。5番目のパラメータ last は time 型のパラメータで、バケツ内のトークンが最後に更新された時刻を表します。6番目のパラメータ lastEvent も時間型のパラメータで、イベントが最後に実行された日時を表します。
Reservation
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
この構造はもう少し面白く、一種の予約操作です。与えられた制限時間内に、予約が可能であれば、それを実行します。パラメータを説明します。最初のパラメータはokで、これはユーザー予約が予約されていないことを意味し、trueは予約されていることを意味します。2番目のパラメータはlimで、特定のリミッターを指定します。3つ目のパラメータはtokensで、今回持っている予約の数を意味します。4番目のパラメータはtimeToActで、これも興味深いです。5番目のパラメータはlimitで、アポイントメント中にリミッターがトークンを生成する割合です。 実際、リミッターがトークンを生成する割合は変数limを使うことで得られますが、なぜこのパラメータを別にする必要があるのでしょうか?リミッターのリミットは可変であり、リミットのパラメータを変更する方法は次のとおりだからです。
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
func (lim *Limiter) SetLimit(newLimit Limit) {
lim.SetLimitAt(time.Now(), newLimit)
}
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens
lim.limit = newLimit
}
関係する方法
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation s OK() method returns false if n exceeds the Limiter's burst size.
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
一番よく使われるのはadvanceで、これは主に、今到着する時刻になったときに、リミッターに何枚のメダルを入れておくかを更新するために使われます。before(last)は、イベント操作の時刻と操作の順序の時刻が必ずしも一致しないシナリオで、内部ではnow.before(last)に注意を払う必要がある場合に現れることがあります。
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
reserveN
リザーブNのソースコードは以下の通りです:
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
1つは、nがトークン・バケットの容量以下であること、もう1つは、トークン・バケットが十分なトークンを生成できるように、ユーザが十分な時間待つことができることです。timeToAct=now.Add(waitDuration)この2つの条件が満たされた場合、ユーザはタスクを実行する時間を与えられます。今の時点で、トークンの数からnを引いた値がマイナスであれば、ユーザーは待つ必要があるということになります。では、待機が実行できる条件とは何でしょうか?timeToAct時点において、現時点でのトークン・バケットの容量が0であり、負の値であること。これは一種の予約操作であり、今あなたが消費するのに十分な容量がないので、もしあなたが待つことができるのであれば、あなたが必要とするものを先に生産します。
CancelAt
実は、もう一つ問題が残っています。それは、ユーザーが時間を予約したのに、キャンセルしてしまうことです。ではどうすればいいのでしょうか?そこでCancelAtの登場です。
// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made.
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))理解するのが難しいのは、この操作です。作者はなぜこのように縮小したのでしょうか?restoreTokens := float64(r.tokens)なぜ?こうすれば、この部分で生成されたトークンは無駄にならず、トークンを必要とする後続のユーザーが消費できるようになります。
トークンバケットアルゴリズムによれば、以下の2つのことがわかります:
- トークンは一定の割合で生成されるため、一定期間に生成できるトークン数には制限があります。
- トークン・バケットには固定サイズがあるため、ある期間に生成されたトークンを使用するユーザーがいない場合、これらのトークンを無期限に保存することはできません。
トークンの生成は、タイムラインをスライスし、トークンに基づいて実行権限を受け取るすべてのイベントをタイムラインのセグメントのように扱うこととして理解できます。中央の直線はタイムラインと解釈することができ、直線上の各矢印はトークンと解釈することができます。下図のイベントNの横にある2本の数直線は、このイベントが占めるトークンの数を表しています。
では、イベント2が赤い矢印の時点でCancelAtアクションを実行した場合、このイベントによって占有されたトークンの影響をどのように回復するのでしょうか?
まず、restoreTokens を追加した場合の影響から説明します!**restoreTokensを一定数追加することで、イベント4実行時のトークン数がrestoreTokensに変わるため、イベント4が実行されるタイミングで実行されるイベントが来た場合、その操作を実行するためのトークンを取得することが可能になります。** その場合、新しいイベントとイベント4が実行される時点が重なります。このとき、トークンバケットアルゴリズムの基本的な意味を考えてみると、本来、イベント4の実行時間帯にトークンはイベント4の実行分しかないのですが、イベント2のキャンセルにより、イベント4の実行時間帯に新たなイベントも実行されることになりますが、この時間帯に発生するトークンの数はイベント4分しかなく、トークンバケットアルゴリズムに違反します。
筆者の考え方は、 、後続のイベントに必要なトークン数が、イベント 2 に必要な r.tokens の数より少ない場合は、この差を加算します。この restoreTokens を加算した後、上記のように後続イベントとイベント 4 の実行時間帯が重なりますが、重なった時間帯とイベント 3 とイベント 4 の間の時間帯を合わせて、キャンセルされた r.tokens の数として一括して扱うことができます。これにより、影響は許容できるレベルまで軽減されます。
この論文におけるイベント3とイベント4は、いくつでもあり得ますし、本文中では単に理解を深めるために具体例を挙げています。





