サードパーティ API と対話する分散アプリケーションがあるシナリオを想像してみましょう。通常、サードパーティ API には、クライアントがリクエストをバーストさせてサービスのダウンタイムを引き起こすことを避けるために、レート制限制御メカニズムが備わっています。このようなシナリオでは、呼び出し元は分散環境でサードパーティ API への送信リクエストの速度をどのように制御できるでしょうか?この投稿では、この問題に対する考えられる戦略について説明します。
リクエストのレートを制御するアルゴリズムは複数ありますが、理解と実装が比較的簡単なため、ここではトークン バケット アルゴリズムに焦点を当てます。このアルゴリズムは次のように述べています: バケットは最大 T トークンを保持でき、アプリケーションがサードパーティ API にリクエストを行う場合は、1 を取る必要があります。バケットからのトークン。バケットが空の場合は、バケット内に少なくとも 1 トークンが存在するまで待機する必要があります。また、バケットには R トークン/ミリ秒の固定レートで 1 トークンが補充されます。
トークン バケット アルゴリズムは非常に簡単に理解できますが、分散環境でこれを使用してサードパーティ API への送信リクエストを制御するにはどうすればよいですか?
分散環境で送信レート制限を制御したい場合は、現在のレート制限に関する一元的な信頼できる情報源が必要です。真実の情報源を実装するには複数の方法がありますが、私は考えられる実装を示して次の図を理想化しました。
上の図では、複数のポッドに分散アプリケーションがあり、各ポッドはサードパーティ API にリクエストを行うことができます。アプリケーション インフラストラクチャには、トークン バケット アルゴリズムを使用してレート制限を制御する TCP サーバーがあります。サードパーティ API にリクエストを行う前に、ポッドは TCP サーバーに新しいトークンを要求し、使用可能なトークンが少なくとも 1 つ存在するまで、TCP サーバーからの応答を待ちます。トークンが利用可能になった後、ポッドはサードパーティ API にリクエストを作成します。
TCP サーバーの実装は、このリポジトリ https://github.com/rafaquelhodev/rlimit/ にあります。次のセクションでは、golang でのトークン バケットの実装について簡単に説明します。
以下に、トークン バケットの実装の背後にある主なアイデアを示します。詳細な実装を理解するには、https://github.com/rafaquelhodev/rlimit/ リポジトリをご覧ください。
レート制限制御は TokenBucket 構造体に集中されています:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
TokenBucket 構造体に subs プロパティがあることがわかります。基本的に、これは特定のトークン バケットのサブスクライバーの配列です。クライアントからトークンが要求されるたびに、クライアントは subs 配列に追加され、新しいトークンがバケットに追加されるとクライアントに通知されます。
バケットを開始するとき、バケットがサポートできるトークンの最大数 (maxTokens) と、トークンがバケットに追加される時間 (refillPeriod) を指定する必要があります:
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
ここで、「トークンはどのようにしてバケットに追加されるのか?」と疑問に思うかもしれません。そのため、バケットの作成時に cron ジョブが開始され、refillPeriod ミリ秒ごとに新しいトークンがバケットに追加されます:
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub最後に、クライアントがバケットからトークンを取得したい場合は、waitAvailable 関数を呼び出す必要があります:
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock()https://github.com/Mohamed-khattab/Token-bucket-rate-limiter からインスピレーションを得ました
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3