Giter Club home page Giter Club logo

leaky-bucket's Introduction

leaky-bucket

github crates.io docs.rs build status

A token-based rate limiter based on the leaky bucket algorithm.

If the bucket overflows and goes over its max configured capacity, the task that tried to acquire the tokens will be suspended until the required number of tokens has been drained from the bucket.

Since this crate uses timing facilities from tokio it has to be used within a Tokio runtime with the time feature enabled.

This library has some neat features, which includes:

Not requiring a background task. This is usually needed by token bucket rate limiters to drive progress. Instead, one of the waiting tasks temporarily assumes the role as coordinator (called the core). This reduces the amount of tasks needing to sleep, which can be a source of jitter for imprecise sleeping implementations and tight limiters. See below for more details.

Dropped tasks release any resources they've reserved. So that constructing and cancellaing asynchronous tasks to not end up taking up wait slots it never uses which would be the case for cell-based rate limiters.


Usage

The core type is the RateLimiter type, which allows for limiting the throughput of a section using its acquire and acquire_one methods.

use leaky_bucket::RateLimiter;
use tokio::time;

let limiter = RateLimiter::builder()
    .max(10)
    .initial(0)
    .refill(5)
    .build();

let start = time::Instant::now();

println!("Waiting for permit...");

// Should take ~400 ms to acquire in total.
let a = limiter.acquire(7);
let b = limiter.acquire(3);
let c = limiter.acquire(10);

let ((), (), ()) = tokio::join!(a, b, c);

println!(
    "I made it in {:?}!",
    time::Instant::now().duration_since(start)
);

Implementation details

Each rate limiter has two acquisition modes. A fast path and a slow path. The fast path is used if the desired number of tokens are readily available, and involves incrementing an atomic counter indicating that the acquired number of tokens have been added to the bucket.

If this counter goes over its configured maximum capacity, it overflows into a slow path. Here one of the acquiring tasks will switch over to work as a core. This is known as core switching.

use leaky_bucket::RateLimiter;
use std::time;

let limiter = RateLimiter::builder()
    .initial(10)
    .interval(time::Duration::from_millis(100))
    .build();

// This is instantaneous since the rate limiter starts with 10 tokens to
// spare.
limiter.acquire(10).await;

// This however needs to core switch and wait for a while until the desired
// number of tokens is available.
limiter.acquire(3).await;

The core is responsible for sleeping for the configured interval so that more tokens can be added. After which it ensures that any tasks that are waiting to acquire including itself are appropriately unsuspended.

On-demand core switching is what allows this rate limiter implementation to work without a coordinating background thread. But we need to ensure that any asynchronous tasks that uses RateLimiter must either run an acquire call to completion, or be cancelled by being dropped.

If none of these hold, the core might leak and be locked indefinitely preventing any future use of the rate limiter from making progress. This is similar to if you would lock an asynchronous Mutex but never drop its guard.

You can run this example with:

cargo run --example block-forever
use leaky_bucket::RateLimiter;
use std::future::Future;
use std::sync::Arc;
use std::task::Context;

struct Waker;

let limiter = Arc::new(RateLimiter::builder().build());

let waker = Arc::new(Waker).into();
let mut cx = Context::from_waker(&waker);

let mut a0 = Box::pin(limiter.acquire(1));
// Poll once to ensure that the core task is assigned.
assert!(a0.as_mut().poll(&mut cx).is_pending());
assert!(a0.is_core());

// We leak the core task, preventing the rate limiter from making progress
// by assigning new core tasks.
std::mem::forget(a0);

// Awaiting acquire here would block forever.
// limiter.acquire(1).await;

Fairness

By default RateLimiter uses a fair scheduler. This ensures that the core task makes progress even if there are many tasks waiting to acquire tokens. As a result it causes more frequent core switching, increasing the total work needed. An unfair scheduler is expected to do a bit less work under contention. But without fair scheduling some tasks might end up taking longer to acquire than expected.

This behavior can be tweaked with the Builder::fair option.

use leaky_bucket::RateLimiter;

let limiter = RateLimiter::builder()
    .fair(false)
    .build();

The unfair-scheduling example can showcase this phenomenon.

cargh run --example unfair-scheduling
# fair
Max: 1011ms, Total: 1012ms
Timings:
 0: 101ms
 1: 101ms
 2: 101ms
 3: 101ms
 4: 101ms
 ...
# unfair
Max: 1014ms, Total: 1014ms
Timings:
 0: 1014ms
 1: 101ms
 2: 101ms
 3: 101ms
 4: 101ms
 ...

As can be seen above the first task in the unfair scheduler takes longer to run because it prioritises releasing other tasks waiting to acquire over itself.

leaky-bucket's People

Contributors

conradludgate avatar k3d3 avatar svend avatar udoprog avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

leaky-bucket's Issues

Cannot create two rate limiters in the same app

I'm trying to create different rate limiters, and it doesn't seem to work. E.g. a simple test:

#[tokio::test]
    async fn test_limited_frequent() {
        let rate_limiter = LeakyBucket::builder()
            .refill_amount(1)
            .refill_interval(Duration::from_millis(100))
            .build()
            .expect("LeakyBucket builder failed");
        let begin = Instant::now();
        for _ in 0..10 {
            rate_limiter.acquire_one().await.expect("No reason to fail");
        }
        let elapsed = Instant::now().duration_since(begin);
        println!("Elapsed: {:?}", elapsed);
        assert!((elapsed.as_secs_f64() - 1.).abs() < 0.1);
    }

    #[tokio::test]
    async fn test_limited_seldom() {
        let rate_limiter = LeakyBucket::builder()
            .refill_amount(1)
            .refill_interval(Duration::from_secs(2))
            .build()
            .expect("LeakyBucket builder failed");
        let begin = Instant::now();
        for _ in 0..2 {
            rate_limiter.acquire_one().await.expect("No reason to fail");
        }
        let elapsed = Instant::now().duration_since(begin);
        println!("Elapsed: {:?}", elapsed);
        // once per 2 seconds => 4 seconds for 2 permits
        assert!((elapsed.as_secs_f64() - 4.).abs() < 0.1);
    }

Each one passes if run independently. But if you run both, then the first one finishes successfully while the second one just hangs forever.

How to call in Future

Hello, I tried adding limit.acquire(8192).await; to "copy.rs poll" but it always prompts me "only allowed inside async functions and blocks", which I know means what.

So I modified some code, but it seems that cx.wake is never called. what should I do to make it work properly.?

let acquire = limit.acquire(8192);
tokio::pin!(acquire);
if acquire.as_mut().poll(cx).is_pending() {
    println!("Pending");
    return Poll::Pending;
}

Add to here:

https://github.com/tokio-rs/tokio/blob/d32ba2cf9d9b7eac3a904f558e5fe4397cc83e89/tokio/src/io/util/copy.rs#L91

Wrong example in readme

In this example (from the README):

use leaky_bucket::RateLimiter;
use std::time;

#[tokio::main]
async fn main() {
    let limiter = RateLimiter::builder()
        .max(10)
        .initial(0)
        .refill(5)
        .build();

    let start = time::Instant::now();

    println!("Waiting for permit...");

    // Should take about 5 seconds to acquire in total.
    let a = limiter.acquire(7);
    let b = limiter.acquire(3);
    let c = limiter.acquire(10);

    let ((), (), ()) = tokio::join!(a, b, c);

    println!(
        "I made it in {:?}!",
        time::Instant::now().duration_since(start)
    );
}

There is this line that says:

// Should take about 5 seconds to acquire in total.

But here it took only 300 millis.

Currently the default value is 100 milliseconds, but it used to be 1 second.

Is that example outdated?

When I insert .interval(std::time::Duration::from_secs(1)) in the builder, it gives me 3 seconds.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.