Vamos imaginar um cenário em que se tenha um aplicativo distribuído que interage com uma API de terceiros. Normalmente, APIs de terceiros possuem um mecanismo de controle de limite de taxa para evitar que seus clientes interrompam solicitações e causem tempo de inatividade em seus serviços. Nesse cenário, como o chamador pode controlar a taxa de solicitações de saída para a API de terceiros em um ambiente distribuído? Este post discute uma possível estratégia para este problema.
Existem vários algoritmos para controlar a taxa de solicitações, mas aqui vamos nos concentrar no algoritmo de token bucket, porque é relativamente fácil de entender e implementar. Este algoritmo afirma que: um bucket pode conter no máximo T tokens, e quando um aplicativo deseja fazer uma solicitação à API de terceiros, ele deve levar 1 token do balde. Se o balde estiver vazio, ele deverá esperar até que haja pelo menos 1 token no balde. Além disso, o balde é recarregado com 1 token a uma taxa fixa de R tokens/milissegundos.
O algoritmo de token bucket é muito simples de entender, mas como alguém pode usá-lo em um ambiente distribuído para controlar a solicitação de saída para APIs de terceiros?
Se alguém quiser controlar o limite de taxa de saída em um ambiente distribuído, é necessária uma fonte centralizada de verdade para o limite de taxa atual. Existem múltiplas maneiras de implementar a fonte da verdade e idealizei o seguinte diagrama com uma possível implementação:
Na figura acima, temos um aplicativo distribuído em vários pods, e cada pod pode fazer solicitações a uma API de terceiros. Na infraestrutura do aplicativo, existe um servidor TCP que controla o limite de taxa usando o algoritmo de token bucket. Antes de fazer uma solicitação à API de terceiros, o pod solicita um novo token ao servidor TCP e aguarda uma resposta do servidor TCP até que haja pelo menos um token disponível. Depois que um token estiver disponível, o pod fará a solicitação à API de terceiros.
A implementação do servidor TCP pode ser encontrada neste repositório https://github.com/rafaquelhodev/rlimit/ e na próxima seção discutirei brevemente a implementação do token bucket em golang.
Abaixo, estou mostrando as principais ideias por trás da implementação do token bucket. Por favor, dê uma olhada no repositório https://github.com/rafaquelhodev/rlimit/ para entender a implementação detalhada.
O controle do limite de taxa é centralizado na estrutura TokenBucket:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
Você pode notar que há uma propriedade subs na estrutura TokenBucket. Basicamente, este é um array de assinantes para um token bucket específico: toda vez que um token é solicitado de um cliente, o cliente é adicionado ao array subs e o cliente é notificado quando um novo token é adicionado ao bucket.
Ao iniciar o bucket, precisamos fornecer um número máximo de tokens que o bucket pode suportar (maxTokens) e a quantidade de tempo que um token é adicionado ao bucket (refillPeriod):
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
Agora, você pode se perguntar: "como um token é adicionado ao bucket?". Para isso, quando um bucket é criado, um cron job é iniciado, e a cada refillPeriod milissegundos, um novo token é adicionado ao bucket:
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] subFinalmente, quando um cliente deseja um token do bucket, a função waitAvailable deve ser chamada:
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock()
Inspirado em https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3