Giter Club home page Giter Club logo

rtrb's Introduction

Real-Time Ring Buffer

A wait-free single-producer single-consumer (SPSC) ring buffer for Rust.

This crate can be used without the standard library (#![no_std]) by disabling the std feature (which is enabled by default), but the alloc crate is needed nevertheless.

Usage

Add this to your Cargo.toml:

[dependencies]
rtrb = "0.3"

Breaking Changes

For a list of breaking changes and for instructions how to upgrade between released versions, have a look at the changelog.

Performance

Measuring the performance of a data structure for inter-thread communication can be quite brittle and the results depend on many factors. A few performance comparisons between competing crates are shown in issue #39, but like all benchmarks, they are deeply flawed and to be taken with a grain of salt. You should make your own measurements that are relevant to your usage patterns. Feel free to share your own results by commenting on that issue.

Development

Creating the HTML docs (which will be available in target/doc/rtrb/index.html):

cargo doc

Running the tests:

cargo test

Testing the benchmarks (without actually benchmarking):

cargo test --benches

Running the benchmarks (using the criterion crate; results will be available in target/criterion/report/index.html):

cargo bench

Creating flame graphs for the benchmarks; first a few preparations:

cargo install flamegraph
echo -1 | sudo tee /proc/sys/kernel/perf_event_paranoid
export CARGO_PROFILE_BENCH_DEBUG=true

Then, creating the flame graph (which will be saved to flamegraph.svg), providing a benchmark (e.g. two_threads), a desired runtime and optionally a benchmark function (e.g. large):

cargo flamegraph --bench two_threads -- --bench --profile-time 10 large

To measure code coverage, nightly Rust is required, as well as a few additional dependencies:

rustup toolchain install nightly
rustup component add llvm-tools-preview
cargo install grcov

Test coverage data can be obtained and analyzed with these commands:

cargo clean
RUSTFLAGS="-Z instrument-coverage" RUSTDOCFLAGS="-Z instrument-coverage -Z unstable-options --persist-doctests target/debug/doctestbins" LLVM_PROFILE_FILE="coverage/%p-%m.profraw" cargo +nightly test
grcov coverage --source-dir . --binary-path target/debug --output-type html --output-path coverage

The last command creates an HTML report in coverage/index.html.

Testing with Miri also needs nightly Rust:

cargo +nightly miri test

This Miri flag should also be tried:

MIRIFLAGS="-Zmiri-preemption-rate=0" cargo +nightly miri test

Running the tests with ThreadSanitizer requires nightly Rust as well:

RUSTFLAGS="-Z sanitizer=thread" cargo +nightly test --tests -Z build-std --target x86_64-unknown-linux-gnu

You might have to adapt the --target option to your system (see e.g. rustup show).

Minimum Supported rustc Version

This crate's minimum supported rustc version (MSRV) is 1.38.0. The MSRV is not expected to be updated frequently, but if it is, there will be (at least) a minor version bump.

Origin Story

The initial code has been ripped off of crossbeam-rs/crossbeam#338, with permission of the PR author.

It has been isolated from the rest of crossbeam with git-filter-repo:

git-filter-repo --subdirectory-filter crossbeam-queue --path src/spsc.rs --path tests/spsc.rs --refs refs/heads/spsc

Alternatives

If you don't like this crate, no problem, there are several alternatives for you to choose from. There are many varieties of ring buffers available, here we limit the selection to wait-free SPSC implementations:

  • ach-spsc (using const generics)
  • heapless (for embedded systems, see heapless::spsc)
  • jack (FFI bindings for JACK, see jack::Ringbuffer)
  • magnetic (see magnetic::spsc module)
  • npnc (see npnc::bounded::spsc module)
  • ringbuf (supports const generics and heap allocation)
  • ringbuffer-spsc (using const generics)
  • shmem-ipc (see shmem_ipc::sharedring and shmem_ipc::ringbuf modules)

There are also implementations in other languages:

If you know more alternatives for this list, please open an issue.

License

Licensed under either of

at your option.

Note that this crate contains a copy of the file cache_padded.rs from https://github.com/crossbeam-rs/crossbeam.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

rtrb's People

Contributors

diwic avatar mgeier avatar raftario avatar ramtype0 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

rtrb's Issues

Implement more marker traits?

Should the RingBuffer be Send?
I think this might be useful.

It could probably even be Sync, even though this could only be used to get its capacity. But why not?

Could/should the Producer/Consumer be RefUnwindSafe?

What about the other structs?
Are they missing any marker traits?

more elegant way to conditionally pop

I have code like this in two places in my application now:

        while let Ok(AudioEngineToGUIMessage::BufferSizeChanged(_)) = from_audio_rx.peek() {
            if let Ok(AudioEngineToGUIMessage::BufferSizeChanged(frames)) = from_audio_rx.pop() {
                let mut buffer_size_frames = buffer_size_frames.borrow_mut();
                *buffer_size_frames = frames;
            } else {
                unreachable!("AudioEngineToGUIMessage not a BufferSizeChanged message?!")
            }
        }

The while let immediately followed by if let feels verbose and hacky... is there a better way to do this?

Batch-read that can take ownership of consumed items

Can a method to take ownership of the items in a ReadChunk be added? I'm not sure what the best API would be (perhaps an consume_into_iter()?) since taking ownership means commit must be called, but such a method would be crucial for efficiently batch-reading non-Copy types.

(Context: using rtrb as a primitive in a scheduler, so it contains non-Clone non-Copy jobs rather than u8s or anything of that sort)

Consider documenting MSRV policy

I have suspicion that this might become one of the more important crates in the ecosystem. So, it might make sense to:

  • Document minimal supported rust version (MSRV)
  • Document MSRV policy (how many rustc versions back are generally supported? is MSRV a semver-breaking change?)
  • Add CI job for MSRV

Blocking while popping

Is there a better way to block until you have something to pop other than:

while input.is_empty() {}
let x = input.pop().unwrap();

?

Efficiently Clearing the Buffer via Consumer

Hi thanks for the crate!

I'm looking for advice on what the best way to just drop all of the data in the buffer from the consumers side.

Currently I'm just doing a while let loop on pop() and ignoring the result. Is there a better way to do this? Say in the case where we only want to consume the first 10 elements, drop the rest, and then consume 10 more later?

Why does `ReadChunk.as_slices()` return two slices?

I apologize if this question is silly, but why does ReadChunk.as_slices() return two slices?

I (perhaps naively) expected ReadChunk to have a method .as_slice() which returns a single slice containing all elements in the chunk. I suspect there is a good reason why we use .as_slices() instead.

Perhaps it would be helpful to add an explanation of the motivation behind this API to the doc comments.

Async support

Thanks a lot for this crate. Are there any plans of adding async support?

Avoid dropping elements on consume (idea)

Hi, thanks for the great library!

Here's my use-case: This is a low latency application. Thread A sends elements over a spsc channel to Thread B. The elements contain heap-allocations, e.g: String. (I know, this is funny to have both "low latency" and "heap allocated elems" in the same system, but that's it)

With any popular queue solution, including rtrb, popping takes ownership of the elem, making the consumer eventually Drop it (unless it does not send the consumed elem back using a different channel). This makes Thread A continuously allocated, and Thread B continuously deallocate, eventually making them contending for the same lock. This has measurable effects.

To solve this issue, I ported my C++ queue to rust: https://github.com/erenon/cueue
This has the following differences compared to rtrb:

  • It default inits every elem in the queue (This requires T to be Default)
  • It does not drop consumed elems, but leaves them in the queue as-is. This avoids heap ping-pong.
  • The produced elems are returned to the producer (in the write_chunk): this allows re-use of resources.
  • It employs an mmap trick that makes the queue truly circular. This makes the internal logic a bit simpler, and the reader/writer interface a bit nicer (e.g: reader always returns a single slice, not two).

Perhaps some ideas can be salvaged to make rtrb even better? (I would prefer the widely used rtrb over my tiny lib, if it was possible) Thanks!

Possible data corruption?

In an attempt to use rtrb in my code (mostly through its Read and Write implementations), I've noticed that occasionally a few hundred bytes gets lost. How can I debug whether this would be a rtrb issue? Has rtrb undergone loom-based rigorous testing, or anything of that sort?

Add ability to query fullness

Hey! First off, thank you for this amazing crate. I'm using it for real-time data processing for a radio telescope, which works wonders.

If possible, I would like a function to query "percent fullness" or something to that end so I can log warnings when we get close to the end. This would help me catch cases where my consuming code can't keep up with production.

Thanks again!

Ability to "mlock" the buffer?

The JACK ring buffer provides this, see https://jackaudio.org/api/ringbuffer_8h.html#aeb059ff23c65389d48ebbe948f05c2b2.

The libc crate seems to be able to do this:

https://docs.rs/libc/0.2.79/libc/fn.mlock.html
https://docs.rs/libc/0.2.79/libc/fn.munlock.html

It should be simple to add this as a method to RingBuffer.

The JACK wrapper also provides it:

https://docs.rs/jack/0.7.0/jack/struct.RingBuffer.html#method.mlock

Windows might need special treatment with https://docs.rs/winapi/0.3.9/winapi/um/memoryapi/fn.VirtualLock.html?

Do we have to expose mlock() as unsafe?

Probably it would be better to simply provide an (unsafe?) method to expose the underlying memory as a pointer?

WriteChunk: Unwritten slots are leaked!

The second part of this comment isn't true anymore:

// Safety: All slots have been initialized in From::from() and there are no destructors.

... since I removed the Copy trait bound in 148cf65.

If commit(n) is called with n smaller than the chunk size, the remaining slots (which have been Default-initialized before) will be leaked.

This can be fixed by calling drop_in_place() on the affected slots.

Weird allocation method

RingBuffer was allocated by allocating Vec, then extract pointer, and forget Vec.
Just using alloc::alloc::alloc(Layout::array::<T>(capacity).unwrap()) seems to be more natural.
Why is this done?

Comparison against alternative crates?

Hi, I'm considering using this crate but am unsure whether the performance is any better than the other SPSPC wait free ringbuffers. A comparison to crossbeam_channel might also be merited, since I could see using their bounded queues in a very similar way to the ringbuffer.

Add an API for Consumer to read all slots?

Now, I read all slots from Consumer by this:

  let result = consumer.read_chunk(consumer.slots());

But sometimes I don't care how many slots hold by RingBuffer, I just want to read all slots.

Do you think it's a good idea to add a method for Consumer like this:

  let (all_slots_size, chunks) = consumer.read_all();

as_mut_slices for ReadChunk

For an extremely niche use case with smoltcp, from what I understand this could just be added with the current API since the ReadChunk has exclusive access to its data already

Add methods for copying slices of data into and out of the ring buffer

This has been discussed in #57.

I think the API for writing would be straightforward and the implementation could use the existing write_chunk_uninit() method.

I'm less sure about the API for reading. I guess users could provide an already initialized slice onto which the data will be copied.
However, it might be more efficient in some cases if the user could provide some kind of uninitialized slice.

I think it would be useful to provide those methods for T: Copy types, because under the hood memcopy() or memmove() will be used for maximum efficiency.
I'm not sure if it would also make sense for less constrained types.

Exposing the `buffer` field of Consumer and Producer is unsound

The Arc<RingBuffer> field on Consumer and Producer is made public, which is unsound. Overwriting the buffer field without updating head and tail can cause a read from uninitialized memory without unsafe code in downstream crates.

If the RingBuffer needs to be exposed, then I would recommend a method returning a shared reference instead of a public field.

Return `&mut [T]` from `copy_to_uninit()`?

I initially thought that this wouldn't be necessary, but I came up with one hypothetical situation:

If there are multiple slices that are supposed to be all added up and written to the ring buffer, the first slice could use copy_to_uninit() and the remaining ones would add themselves to the already written data.
However, for (safely) adding Ts a &mut [T] would be needed.

This would be a breaking change.

Separate `struct`s for fixed-size chunks?

Currently it is possible to create a ring buffer with a given number of chunks with RingBuffer::with_chunks(..., ...) (introduced in #17). This guarantees that - if only the "chunks" API is used - the second slice of each chunk will be empty.

I think this is good because it allows a simple and very efficient usage.
However, what's not good is that this goes behind the back of the compiler. The second slice is still presented to the user and only the user (and not the compiler) knows that it's empty.

I think it would be good to add something like ChunkProducer and ChunkConsumer that only provide single fixed-size slices and they don't allow single-element push()/pop() at all.

Note that both ends of a RingBuffer (producer and consumer) can independently have the "fixed chunk size" property.
This means there should be four different constructors, yielding

  • (Producer, Consumer)
  • (ChunkProducer, Consumer)
  • (Producer, ChunkConsumer)
  • (ChunkProducer, ChunkConsumer)

The with_chunks() constructor should probably be deprecated or completely removed.
The split() function probably has to be replaced with something else that allows the four different cases mentioned above.
This would very likely be a breaking change, requiring a version bump to 0.2.

The "classic" Producer and Consumer would still have the write_chunk()/read_chunk() API, which allows changing the chunk sizes during runtime.

The ChunkProducer could have an API like this:

fn write_slice(&mut self) -> WriteSlice;
unsafe fn write_slice_uninit(&mut self) -> WriteSliceUninit;

The WriteSlice could probably implement DerefMut<Target = [T]> to allow writing to the chunk and it could have a Drop implementation that commits the written data to the ring buffer.
WriteSliceUninit would implement DerefMut<Target = [MaybeUninit<T>]>. Calling write_slice_uninit() would be unsafe because the user would have to ensure that the whole slice is initialized before Drop::drop() is called (including when a panic occurs).

The ChunkConsumer could have this API:

fn read_slice(&mut self) -> ReadSlice;

The ReadSlice would implement Deref<Target = [T]> to allow reading data and Drop to advance the read index.

I'm not sure if that's necessary, but it could probably also have something like this:

fn peek_slice(&self) -> &[T];

Does that sound like a good idea?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.