Imaginons un scénario dans lequel on dispose d'une application distribuée qui interagit avec une API tierce. Habituellement, les API tierces disposent d'un mécanisme de contrôle de limite de débit afin d'éviter à leurs clients d'éclater les requêtes et de provoquer des temps d'arrêt de leurs services. Dans un tel scénario, comment l'appelant peut-il contrôler le taux de requêtes sortantes vers l'API tierce dans un environnement distribué ? Cet article discute d'une stratégie possible pour ce problème.
Il existe plusieurs algorithmes pour contrôler le taux de requêtes, mais ici nous nous concentrerons sur l'algorithme du bucket de jetons, car il est relativement facile à comprendre et à mettre en œuvre. Cet algorithme indique que : un compartiment peut contenir un maximum de T jetons, et lorsqu'une application souhaite faire une requête à l'API tierce, elle doit prendre 1 jeton du seau. Si le compartiment est vide, il doit attendre qu'il y ait au moins 1 jeton dans le compartiment. De plus, le seau est rempli avec 1 jeton à un taux fixe de R jetons/millisecondes.
L'algorithme du token bucket est très simple à comprendre, mais comment quelqu'un peut-il l'utiliser dans un environnement distribué pour contrôler la requête sortante vers des API tierces ?
Si l'on souhaite contrôler la limite de débit sortant dans un environnement distribué, une source centralisée de vérité pour la limite de débit actuelle est nécessaire. Il existe plusieurs façons d'implémenter la source de vérité et j'ai idéalisé le diagramme suivant avec une implémentation possible :
Dans la figure ci-dessus, nous avons une application distribuée dans plusieurs pods, et chaque pod peut envoyer des requêtes à une API tierce. Dans l'infrastructure de l'application, il existe un serveur TCP qui contrôle la limite de débit à l'aide de l'algorithme du compartiment à jetons. Avant de faire une requête à l'API tierce, le pod demande un nouveau jeton au serveur TCP et le pod attend une réponse du serveur TCP jusqu'à ce qu'il y ait au moins un jeton disponible. Une fois qu'un jeton est disponible, le pod envoie la demande à l'API tierce.
L'implémentation du serveur TCP peut être trouvée dans ce référentiel https://github.com/rafaquelhodev/rlimit/ et dans la section suivante, je discuterai brièvement de l'implémentation du bucket de jetons dans Golang.
Ci-dessous, je montre les idées principales derrière la mise en œuvre du compartiment à jetons. Veuillez jeter un œil au référentiel https://github.com/rafaquelhodev/rlimit/ pour comprendre la mise en œuvre détaillée.
Le contrôle des limites de débit est centralisé dans la structure TokenBucket :
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
Vous pouvez remarquer qu'il existe une propriété subs dans la structure TokenBucket. Fondamentalement, il s'agit d'un tableau d'abonnés pour un compartiment de jetons spécifique : chaque fois qu'un jeton est demandé à un client, le client est ajouté au tableau subs et le client est averti lorsqu'un nouveau jeton est ajouté au compartiment.
Lors du démarrage du bucket, nous devons fournir un nombre maximum de jetons que le bucket peut prendre en charge (maxTokens) et la durée pendant laquelle un jeton est ajouté au bucket (refillPeriod) :
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
Maintenant, vous vous demandez peut-être : "comment un jeton est ajouté au bucket ?". Pour cela, lorsqu'un bucket est créé, une tâche cron est démarrée, et à chaque milliseconde de rechargePeriod, un nouveau token est ajouté au bucket :
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] subEnfin, lorsqu'un client souhaite un token du bucket, la fonction waitAvailable doit être appelée :
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock()Inspiré par https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3