"If a worker wants to do his job well, he must first sharpen his tools." - Confucius, "The Analects of Confucius. Lu Linggong"
Front page > Programming > Controlling outgoing rate limit

Controlling outgoing rate limit

Published on 2024-07-30
Browse:776

Let's imagine a scenario that one has a distributed application that interacts with a third party API. Usually, third party APIs have a rate-limit control mechanism in order to avoid their clients from bursting requests and causing down-time on their services. In such a scenario, how can the caller control the rate of outgoing requests to the third party API in a distributed environment? This post discuss a possible strategy for this problem.

There a multiple algorithms to control the rate of requests, but here we'll focus on the token bucket algorithm, because it is relatively easy to understand and to implement. This algorithm states that: a bucket can hold a maximum of T tokens, and when an application wants to make a request to the third party API, it has to take 1 token from the bucket. If the bucket is empty, it has to wait until there is a least 1 token in the bucket. Also, the bucket is refilled with 1 token at a fixed rate of R tokens/milliseconds.

The token bucket algorithm is very straightforward to understand, but how can someone use it in a distributed environment to control the outgoing request to third party APIs?

If one wants to control the outgoing rate limit in a distributed environment, a centralized source of truth for the current rate limit is necessary. There are multiple ways to implement the source of truth and I've idealized the following diagram with a possible implementation:

Controlling outgoing rate limit

In the figure above, we have a distributed application in multiple pods, and each pod can make requests to a third party API. In the application infrastructure, there is a TCP server that controls the rate limit by using the token bucket algorithm. Before making a request to the third party API, the pod asks the TCP server for a new token, and the pod waits for a response from the TCP server until there is at least one available token. After a token is available, the pod makes the request to the third party API.

The TCP server implementation can be found in this repository https://github.com/rafaquelhodev/rlimit/ and in the next section I'll discuss briefly the token bucket implementation in golang.

Token bucket implementation

Below, I'm showing the main ideas behind the token bucket implementation. Please, take a look at the https://github.com/rafaquelhodev/rlimit/ repository to understand the detailed implementation.

The rate limit control is centralized in the TokenBucket struct:

type TokenBucket struct {
    id           string
    mu           sync.Mutex
    tokens       int64
    maxTokens    int64
    refillPeriod int64
    cron         chan bool
    subs         []chan bool
}

You can notice that there is a subs property in the TokenBucket struct. Basically, this is an array of subscribers for a specific token bucket: every time a token is requested from a client, the client is added to the subs array and the client is notified when a new token is added to the bucket.

When starting the bucket, we need to provide a maximum number of tokens the bucket can support (maxTokens) and the amount of time a token is added to the bucket (refillPeriod):

func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
    bucket := &TokenBucket{
        id:           id,
        tokens:       0,
        maxTokens:    maxTokens,
        refillPeriod: refillPeriod,
        cron:         make(chan bool),
        subs:         make([]chan bool, 0),
    }
    fmt.Printf("refill period  = %d\n", refillPeriod)
    bucket.startCron()
    return bucket
}

Now, you might wonder, "how a token is added to bucket?". For that, when a bucket is created, a cron job is started, and at every refillPeriod milliseconds, a new token is added to the bucket:

func (tb *TokenBucket) startCron() {
    ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)

    go func() {
        for {
            select {
            case  0 {
                        sub := tb.subs[0]
                        tb.subs = tb.subs[1:]
                        sub 



Finally, when a client wants a token from the bucket, the waitAvailable function must be called:

func (tb *TokenBucket) waitAvailable() bool {
    tb.mu.Lock()

    if tb.tokens > 0 {
        fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
        tb.tokens -= 1
        tb.mu.Unlock()
        return true
    }

    fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)

    ch := tb.tokenSubscribe()

    tb.mu.Unlock()

    





Inspired by https://github.com/Mohamed-khattab/Token-bucket-rate-limiter

Release Statement This article is reproduced at: https://dev.to/rafaquelhodev/controlling-outgoing-rate-limit-3klg?1 If there is any infringement, please contact [email protected] to delete it
Latest tutorial More>

Disclaimer: All resources provided are partly from the Internet. If there is any infringement of your copyright or other rights and interests, please explain the detailed reasons and provide proof of copyright or rights and interests and then send it to the email: [email protected] We will handle it for you as soon as possible.

Copyright© 2022 湘ICP备2022001581号-3