Giter Club home page Giter Club logo

Comments (12)

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024 1

Thank you for explaining. I have a better understanding of the issue now.

  1. Ensure that multiple update operations cannot happen at once. This is to prevent unnecessary load to the upstream service by fetching the exact same data multiple times.

This is done by and_try_compute_with. It guarantees only one update operation to happen at a time for the same key.

as long as 'B's function early returns (so it has to know that A just ran).

OK. So this is why you wanted the update operations from A and B to be coalesced.

I think what you really want is to ensure that the update operation for the same key should not happen more than once in a given time frame. (B should return early if its f is called within X seconds after A has updated the value. Not necessary if they happen at the same time.) This cannot be handled by coalescing because there is a case when B is called after A has updated the value. They happen at separate times, so they cannot be coalesced.

You can achieve this by storing the last update time of the value in the cache and use and_try_compute_with. The following code snippet demonstrates this:

// Cargo.toml
//
// [dependencies]
// futures-util = "0.3.30"
// moka = { version = "0.12.7", features = ["future"] }
// tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros", "sync", "time"] }

use std::{sync::Arc, time::{Duration, Instant}};

use moka::future::Cache;

pub type Key = String;
pub type Value = String;

#[derive(Clone, Debug)]
pub struct ValueAndUpdatedAt<V> {
    /// The value.
    #[allow(unused)]
    value: V,
    /// The time when the value was last updated.
    updated_at: Instant,
}

async fn refresh(
    cache: &Cache<Key, ValueAndUpdatedAt<Value>>,
    key: &Key,
) -> Result<(), Box<String>> {
    use moka::ops::compute::{CompResult, Op};

    let key = key.clone();

    let result = cache
        .entry(key.clone())
        .and_try_compute_with(move |maybe_entry| async {
            if let Some(entry) = maybe_entry {
                // Do early return if the entry was updated less than 8 seconds ago.
                if entry.into_value().updated_at.elapsed() < Duration::from_secs(8) {
                    return Ok(Op::Nop) as Result<_, Box<String>>;
                }
            }

            // If we are here, `maybe_entry` should be `None` or the entry was
            // updated more than or equal to 8 seconds ago. Get the latest value
            // from the upstream service and insert it to this cache.
            //
            // Since `and_try_compute_with` ensures that only one task can enter
            // this async block at a time for the same key, we can be sure that
            // the value in the upstream service is retrieved only once.
            let value = get_latest_from_upstream(key).await?;

            // Update the value in the cache.
            Ok(Op::Put(ValueAndUpdatedAt {
                value,
                updated_at: Instant::now(),
            }))
        })
        .await;

    match result {
        Err(e) => {
            eprintln!("Error: {e:?}");
            return Err(e);
        }
        Ok(CompResult::Unchanged(_entry)) => {
            println!(
                "  Value existed in the cache. Did not update the value \
                because not enough time has passed since the last update."
            );
        }
        Ok(CompResult::ReplacedWith(_entry)) => {
            println!(
                "  Value existed in the cache. \
                Updated the cached value with the latest one from the upstream service."
            );
        }
        Ok(CompResult::Inserted(_entry)) => {
            println!(
                "  Value did not exist in the cache. \
                Inserted the latest value from the upstream service."
            );
        }
        Ok(CompResult::StillNone(_key)) => unreachable!(),
        Ok(CompResult::Removed(_entry)) => unreachable!(),
    }

    Ok(())
}

async fn get_latest_from_upstream(_key: Key) -> Result<Value, Box<String>> {
    // Simulate an upstream service that takes some time to respond
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok("value".to_string())
}

#[tokio::main]
async fn main() -> Result<(), Box<String>> {
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(360))
        .build();

    let key = "key".to_string();

    println!("Refreshing the value in the cache.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 10 seconds...");
    tokio::time::sleep(Duration::from_secs(10)).await;

    println!("\nRun two async tasks concurrently and try to refresh the value for the same key.");
    let barrier = Arc::new(tokio::sync::Barrier::new(2));

    let handles = (0..2).into_iter().map(|_| {
        let cache = cache.clone();
        let key = key.clone();
        let barrier = barrier.clone();

        tokio::spawn(async move {
            barrier.wait().await;
            refresh(&cache, &key).await
        })
    });

    // Wait for all tasks to complete.
    futures_util::future::join_all(handles).await;
    println!("\nAll tasks completed.");

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nTry to refresh the value for the key.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key again.");
    refresh(&cache, &key).await?;

    Ok(())
}
$ cargo run

Refreshing the value in the cache.
  Value did not exist in the cache. Inserted the latest value from the upstream service.

Sleeping for 10 seconds...

Run two async tasks concurrently and try to refresh the value for the same key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.
  Value existed in the cache. Did not update the value because not enough time has passed since the last update.

All tasks completed.

Sleeping for 5 seconds...

Try to refresh the value for the key.
  Value existed in the cache. Did not update the value because not enough time has passed since the last update.

Sleeping for 5 seconds...

Refresh the value for the key again.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

Well, actually the cache already stores the last update time for each key, but it is not exposed to the user. When I have more time, I will expose this via a new method of Entry. (There is a plan to add metadata() method to the Entry #312 (comment))

from moka.

ben-manes avatar ben-manes commented on August 16, 2024 1

This is a bit hard to understand, but please review Caffeine to see if anything it does helps refine your approach.

It’s refreshAfterWrite will asynchronously reload an entry if accessed after a time threshold and return the currently cached value. This way active content stays fresh, inactive is allowed to expire, and it hides the reload penalty from cache usages. While this is individual per key, a coalescing loader can batch over a time/space window for a more efficient operation. The in-flight reloads are stored in a secondary mapping, invalidated if the mapping was modified (linearizability), and dedupes its calls.

A common confusion is by users who instead want to periodically reload the cache contents. That’s not really a cache but a warm replica as no eviction should ever take place. That’s simply a scheduled task to recreate an immutable map, so it is clearer to write that code directly without needing a library.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024 1

Here is another example. This one only checks if B is blocked by A. If so, B does early return.

Like the previous example, this stores the last update time of the value in the cache and uses and_try_compute_with. Only the early return condition was changed:

// Cargo.toml
//
// [dependencies]
// futures-util = "0.3.30"
// moka = { version = "0.12.7", features = ["future"] }
// tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros", "sync", "time"] }

use std::{sync::Arc, time::{Duration, Instant}};

use moka::future::Cache;

pub type Key = String;
pub type Value = String;

#[derive(Clone, Debug)]
pub struct ValueAndUpdatedAt<V> {
    /// The value.
    #[allow(unused)]
    value: V,
    /// The time when the value was last updated.
    updated_at: Instant,
}

async fn refresh(
    cache: &Cache<Key, ValueAndUpdatedAt<Value>>,
    key: &Key,
) -> Result<(), Box<String>> {
    use moka::ops::compute::{CompResult, Op};

    let key = key.clone();
    let started_at = Instant::now();

    let result = cache
        .entry(key.clone())
        .and_try_compute_with(move |maybe_entry| async move {
            if let Some(entry) = maybe_entry {
                // Do early return if we were blocked by another async task
                // updating the value for the same key.
                //
                // This can be checked if our started_at time is earlier than or
                // equal to the time that entry was updated by other async task.
                if started_at <= entry.into_value().updated_at {
                    return Ok(Op::Nop) as Result<_, Box<String>>;
                }
            }

            // If we are here, `maybe_entry` should be `None` or the entry was
            // updated _before_ we started the computation.
            //
            // Since `and_try_compute_with` ensures that only one task can enter
            // this block at a time for the same key, we can be sure that the
            // value for the key in the upstream service is updated only once.
            let value = get_latest_from_upstream(key).await?;

            // Update the value in the cache.
            Ok(Op::Put(ValueAndUpdatedAt {
                value,
                updated_at: Instant::now(),
            }))
        })
        .await;

    match result {
        Err(e) => {
            eprintln!("Error: {e:?}");
            return Err(e);
        }
        Ok(CompResult::Unchanged(_entry)) => {
            println!(
                "  Value existed in the cache. Did not update the value \
                because it was already done by another async task running concurrently."
            );
        }
        Ok(CompResult::ReplacedWith(_entry)) => {
            println!(
                "  Value existed in the cache. \
                Updated the cached value with the latest one from the upstream service."
            );
        }
        Ok(CompResult::Inserted(_entry)) => {
            println!(
                "  Value did not exist in the cache. \
                Inserted the latest value from the upstream service."
            );
        }
        Ok(CompResult::StillNone(_key)) => unreachable!(),
        Ok(CompResult::Removed(_entry)) => unreachable!(),
    }

    Ok(())
}

async fn get_latest_from_upstream(_key: Key) -> Result<Value, Box<String>> {
    // Simulate an upstream service that takes some time to respond
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok("value".to_string())
}

#[tokio::main]
async fn main() -> Result<(), Box<String>> {
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(360))
        .build();

    let key = "key".to_string();

    println!("Refreshing the value in the cache.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 10 seconds...");
    tokio::time::sleep(Duration::from_secs(10)).await;

    println!("\nRun two async tasks concurrently and try to refresh the value for the same key.");
    let barrier = Arc::new(tokio::sync::Barrier::new(2));

    let handles = (0..2).into_iter().map(|_| {
        let cache = cache.clone();
        let key = key.clone();
        let barrier = barrier.clone();

        tokio::spawn(async move {
            barrier.wait().await;
            refresh(&cache, &key).await
        })
    });

    // Wait for all tasks to complete.
    futures_util::future::join_all(handles).await;
    println!("\nAll tasks completed.");

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key again.");
    refresh(&cache, &key).await?;

    Ok(())
}
$ cargo run

Refreshing the value in the cache.
  Value did not exist in the cache. Inserted the latest value from the upstream service.

Sleeping for 10 seconds...

Run two async tasks concurrently and try to refresh the value for the same key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.
  Value existed in the cache. Did not update the value because it was already done by another async task running concurrently.

All tasks completed.

Sleeping for 5 seconds...

Refresh the value for the key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

Sleeping for 5 seconds...

Refresh the value for the key again.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

from moka.

ShiromMakkad avatar ShiromMakkad commented on August 16, 2024 1

Thank you so much. I'll implement this example!

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

Hi. Thank you for using moka.

I'm looking for a function that has the same behavior as and_try_compute_with but the same concurrency as or_insert_with.

I believe you can use and_try_compute_with method for your use case.

Maybe the document is a bit confusing, but and_try_compute_with and or_insert_with actually have the same concurrency. They both use key-level lock and never evaluate more than one future at a time on the same key.

or_insert_with:

Only one of the calls evaluates its future (...), and other calls wait for that future to resolve (...).

Here is how or_insert_with works under the hood:

  1. Two async tasks A and B calls or_insert_with on the same key at the same time.
  2. Only one of them, let's say A, can acquire an exclusive writer lock on the key-level RwLock.
  3. B should have failed to acquire the writer lock, so it will try to get a shared reader lock instead. Since A still holds the writer lock, B has to wait for A to release the lock.
  4. A's init future is evaluated.
  5. When the future is resolved to a value V, A inserts V to the cache and also stores it to the place where the writer lock protects.
  6. A releases the writer lock.
  7. Now B acquires a shared reader lock, and gets V protected by the lock.

Here is how and_try_compute_with works under the hood:

  1. Two async tasks A and B calls and_compute_with on the same key at the same time.
  2. Only one of them, let's say A, can acquire an exclusive writer lock on the key-level RwLock.
  3. Since A still holds the writer lock, B has to wait for A to release the lock.
  4. A's f closure is evaluated, and then returned future is evaluated.
  5. When the future is resolved, and let's say Op::Put(V) is returned, A inserts V to the cache.
  6. A releases the writer lock.
  7. Now B acquires the exclusive writer lock.
  8. B's f closure is evaluated, and then returned future is evaluated.
  9. When the future is resolved, and let's say Op::Nop (no-operation) is returned, B gets the existing V inserted by A.

In both cases, B has to wait for A to finish evaluating the future. (The same concurrency.)

In and_try_compute_with's case, B's f closure is evaluated at step 8, but if it returns std::future::ready(Op::Nop), it should be resolved immediately and the stored value V is returned. The time spent on resolving the future will be negligible.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

Also, one question about this approach: Can multiple calls to remove at the same time return copies of the value, or will only one call get the value and all others get None back?

It should be the latter one; only one call gets the value and all others get None back.

from moka.

ShiromMakkad avatar ShiromMakkad commented on August 16, 2024

Thanks for your response. So I can either go with my current solution (although it fails on the ttl and I could lose the key if the update fails) or I can have B resolve immediately.

Is there a way that I can say if(this closure had to block and wait for A) return Op::Nop at the top of my closure? Or do I need to track that external to moka? The behavior I'm looking for is similar to or_insert_with where B acquires a shared reader lock rather than evaluating its f closure, but any solution where B doesn't make a network call works.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

Is there a way that I can say if(this closure had to block and wait for A) return Op::Nop at the top of my closure?

No. Currently, moka does not provide a way to know if the closure is going to block. (The same thing to or_insert_with.)

I am not sure how your refresh ahead cache would work, but I guess instead of B being blocked from step 3 to step 7 below, you want B to return immediately with the current V in the cache?

  1. Since A still holds the writer lock, B has to wait for A to release the lock.
  2. A's f closure is evaluated, and then returned future is evaluated.
  3. When the future is resolved, and let's say Op::Put(V) is returned, A inserts V to the cache.
  4. A releases the writer lock.
  5. Now B acquires the exclusive writer lock.
  6. B's f closure is evaluated, and then returned future is evaluated.
  7. When the future is resolved, and let's say Op::Nop (no-operation) is returned, B gets the existing V inserted by A.

from moka.

ShiromMakkad avatar ShiromMakkad commented on August 16, 2024

I don't really mind if 'B' blocks during steps 3-7 but yes, returning immediately would be the ideal behavior.

I've got a seperate function for getting data. The update function needs to:

  1. Return whether the update returned successfully and if there was an error, return that error.
  2. Ensure that multiple update operations cannot happen at once. This is to prevent unnecessary load to the upstream service by fetching the exact same data multiple times.
  3. I don't need update to return the value stored in the cache. I've got a seperate function for that.

'B' blocking while 'A' is updating isnt ideal but it's acceptable as long as 'B's function early returns (so it has to know that A just ran).

Considering the behavior of '.remove()', this should be possible as-is but it would be convenient if there was a ready-made function for this so I can preserve ttls.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

Right now, I'm simply removing the key before running the closure, but this resets the TTL on the value. I don't want to do that.

In the above code snippet, and_try_compute_with resets the TTL of the value when it is updated. If you want to keep the TTL of the value, you can use per-entry expiration instead of the global TTL for the cache. (An example and the API)

  • Make the trait method expire_after_create to return the new TTL Some(Duration) for the value when it is newly inserted.
  • Make the trait method expire_after_update to return the passed duration_until_expiry, so the TTL of the value is not reset when the value itself is updated.
  • Make the trait method expire_after_read to return the passed duration_until_expiry, so the TTL of the value is not reset when the value is read.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

I don't really mind if 'B' blocks during steps 3-7 but yes, returning immediately would be the ideal behavior.

OK. For now, I would not have enough time to implement this, but hopefully in near future.

EDIT: Created #433. Please subscribe the issue if you are interested.

from moka.

tatsuya6502 avatar tatsuya6502 commented on August 16, 2024

This is a bit hard to understand, but please review Caffeine to see if anything it does helps refine your approach.

Thank you for the info! I will take a look at refreshAfterWrite.

from moka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.