Giter Club home page Giter Club logo

spscqueue's Introduction

SPSCQueue.h

C/C++ CI License

A single producer single consumer wait-free and lock-free fixed size queue written in C++11. This implementation is faster than both boost::lockfree::spsc and folly::ProducerConsumerQueue.

Example

SPSCQueue<int> q(1);
auto t = std::thread([&] {
  while (!q.front());
  std::cout << *q.front() << std::endl;
  q.pop();
});
q.push(1);
t.join();

See src/SPSCQueueExample.cpp for the full example.

Usage

  • SPSCQueue<T>(size_t capacity);

    Create a SPSCqueue holding items of type T with capacity capacity. Capacity needs to be at least 1.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Returns true on success and false if queue is full. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true.

  • T *front();

    Return pointer to front of queue. Returns nullptr if queue is empty.

  • void pop();

    Dequeue first item of queue. You must ensure that the queue is non-empty before calling pop. This means that front() must have returned a non-nullptr before each call to pop(). Requires std::is_nothrow_destructible<T>::value == true.

  • size_t size();

    Return the number of items available in the queue.

  • bool empty();

    Return true if queue is currently empty.

Only a single writer thread can perform enqueue operations and only a single reader thread can perform dequeue operations. Any other usage is invalid.

Huge page support

In addition to supporting custom allocation through the standard custom allocator interface this library also supports standard proposal P0401R3 Providing size feedback in the Allocator interface. This allows convenient use of huge pages without wasting any allocated space. Using size feedback is only supported when C++17 is enabled.

The library currently doesn't include a huge page allocator since the APIs for allocating huge pages are platform dependent and handling of huge page size and NUMA awareness is application specific.

Below is an example huge page allocator for Linux:

#include <sys/mman.h>

template <typename T> struct Allocator {
  using value_type = T;

  struct AllocationResult {
    T *ptr;
    size_t count;
  };

  size_t roundup(size_t n) { return (((n - 1) >> 21) + 1) << 21; }

  AllocationResult allocate_at_least(size_t n) {
    size_t count = roundup(sizeof(T) * n);
    auto p = static_cast<T *>(mmap(nullptr, count, PROT_READ | PROT_WRITE,
                                   MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
                                   -1, 0));
    if (p == MAP_FAILED) {
      throw std::bad_alloc();
    }
    return {p, count / sizeof(T)};
  }

  void deallocate(T *p, size_t n) { munmap(p, roundup(sizeof(T) * n)); }
};

See src/SPSCQueueExampleHugepages.cpp for the full example on how to use huge pages on Linux.

Implementation

Memory layout

The underlying implementation is based on a ring buffer.

Care has been taken to make sure to avoid any issues with false sharing. The head and tail indices are aligned and padded to the false sharing range (cache line size). Additionally the slots buffer is padded with the false sharing range at the beginning and end, this prevents false sharing with any adjacent allocations.

This implementation has higher throughput than a typical concurrent ring buffer by locally caching the head and tail indices in the writer and reader respectively. The caching increases throughput by reducing the amount of cache coherency traffic.

To understand how that works first consider a read operation in absence of caching: the head index (read index) needs to be updated and thus that cache line is loaded into the L1 cache in exclusive state. The tail (write index) needs to be read in order to check that the queue is not empty and is thus loaded into the L1 cache in shared state. Since a queue write operation needs to read the head index it's likely that a write operation requires some cache coherency traffic to bring the head index cache line back into exclusive state. In the worst case there will be one cache line transition from shared to exclusive for every read and write operation.

Next consider a queue reader that caches the tail index: if the cached tail index indicates that the queue is empty, then load the tail index into the cached tail index. If the queue was non-empty multiple read operations up until the cached tail index can complete without stealing the writer's tail index cache line's exclusive state. Cache coherency traffic is therefore reduced. An analogous argument can be made for the queue write operation.

This implementation allows for arbitrary non-power of two capacities, instead allocating a extra queue slot to indicate full queue. If you don't want to waste storage for a extra queue slot you should use a different implementation.

References:

Testing

Testing lock-free algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the item constructor and destructor is invoked correctly.
  • A multi-threaded fuzz test verifies that all items are enqueued and dequeued correctly under heavy contention.

Benchmarks

Throughput benchmark measures throughput between 2 threads for a queue of int items.

Latency benchmark measures round trip time between 2 threads communicating using 2 queues of int items.

Benchmark results for a AMD Ryzen 9 3900X 12-Core Processor, the 2 threads are running on different cores on the same chiplet:

Queue Throughput (ops/ms) Latency RTT (ns)
SPSCQueue 362723 133
boost::lockfree::spsc 209877 222
folly::ProducerConsumerQueue 148818 147

Cited by

SPSCQueue have been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

About

This project was created by Erik Rigtorp <[email protected]>.

spscqueue's People

Contributors

dfa1 avatar frabert avatar rigtorp avatar saracen24 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spscqueue's Issues

The alignment of Slot?

I think the alignment of Slot can avoid false sharing between items, like how we do in MPMCQueue.
Should we also use an alignment of Slot in an SPSCQueue?

Non-blocking push?

It it possible to tweak this such that the push methods do not block when full, but instead pop from the other end like a classic ring buffer?

If it requires significant changes, perhaps you could publish that separately?

Minimum Capacity

Is the minimum capacity 2 or 3? The example uses 2 but then it says "Capacity need to be greater than 2."

capacity

Example: for a 'capacity' of 10 provided to the ctor, only 9 items can be pushed to the queue.
I understand this is not a bug, but at least the documentation should state so.
The best would have been: for a capacity of X specified by the client, use X - 1 as real capacity. But it's too late to change.
This would also have made it clearer why the capacity must be at least 2.

mmap IPC allocator example

Hi there,

I'm trying to use your awesome queue to do inter-process shared memory communication on linux.

I think it can be done with some modification to the huge page allocator example, but coundn't come up with a correct implementation.

If IPC is possible, could you please add an IPC demo to the example? Many thanks.

add T * back() and pop_back actions

at the moment, front() and pop only modify the front of the queue

eg

add 1
add 2
[1,2]
front() == 1
pop
[2,0]
push 1
[2,1]

but I need to be able to modify the back of the queue instead

add 1
add 2
[1,2]
back() == 2
pop_back
[1,0]
push 2
[1,2]

Small things

First of all, kudos to your cache-aware indices. Though the underlying logic is not obvious (!), it does work and improves performances.

Two small things, not new to version 1.1:

  1. The line:
    char padding_[kCacheLineSize - sizeof(writeIdxCache_)]
    has no effect as the global structure has a size that is already quantized, because of alignas(kCacheLineSize) members.

  2. The way you compute kPadding is not optimal, because if sizeof(T) == kCacheLineSize, then kPadding is 1, while 0 would be better.

static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;

Windows compilation

Hi Erik,

It does not compile under MS windows, you use some GNU specific stuffs !

Thx

Provide a way to let consumer know push failed

The use case is, if consumer is slow to keep up with incoming events in the queue, consumer can know about it and decide to quit, instead of making bad decisions based on stale information.

One easy way I can think of, is to have a failedPushCount that can be checked by user. User can decide to continue if failedPushCount(number of dropped messages) is small and stop if the number exceeds a pre-defined threshold.

It is up to the main implementation. The most important feature I am looking for is to notify consumer if push fails.

Thanks.

Assertion failed

Hi!

I've been using SPSCQueue for some time and just updated to the latest version and now hit this:

SPSCQueue.hpp:188: void Rigtorp::SPSCQueue<T, Allocator>::pop() [with T = PacBio::CCS::Chunk; Allocator = std::allocator<PacBio::CCS::Chunk>]: Assertion `writeIdx_.load(std::memory_order_acquire) != readIdx' failed.

Any advice? Thank you!

optimize front() for bust of data

currently, front() always do head_.load(std::memory_order_acquire. I think it can load(std::memory_order_relaxed) first, then try to load(std::memory_order_acquire). This will help when producer has a lot of data coming in at the same time.

Facebook folly queue has a pull request for this. facebook/folly#1133. Not sure why it was not accepted into master.

Error with __cpp_lib_hardware_interference_size on MacOS llvm-g++ 12.x

The check for __cpp_lib_hardware_interference_size in include/rigtorp/SPSCQueue.h seems to incorrectly indicate this feature is available in MacOS llvm-g++ but it's not really implemented.

Not sure how this should be checked correctly, but for example following post suggests instead

#if __cpp_lib_hardware_interference_size >= 201603

This was seen on MacOS 10.15 with xcode CLI tools 12.1 and llvm-g++ version clang-1200.0.32.21

COMPILE ERROR Mac OS - hardware_destructive_interference_size is not found

https://jira.mongodb.org/browse/SERVER-44270 only issue i can find that is related to this

__cpp_lib_hardware_interference_size is defined even though std::hardware_destructive_interference_size is not found

when using

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

this does not occur when using

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

or

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

wrong comment

for the method:
try_push(P &&v)
the comment ends with "Blocks if queue is full"
(which is false)

Read multiple elements at once

Is there any way to access elements, without copying them first?
I've been using direct access to the underlying memory with front(), but that requires batches of constant size and total capacity being a multiple of that size.
I don't care. I just want to read as much contiguous data as possible. So from front() either all elements or to the end of the underlying storage.
Also a fast way to delete them afterwards would be nice.

Any suggestions/ideas? Is this even "legal"?

Edit: I've seen in another issue, the bipbuffer has been recommended. However, is there any modern implementation? One that would support SPSC, with all the safety features like for example this lib?

Edit2: I ended up choosing the bipbuffer from ETL library.

Memory buffer

How to use spsc queue as memory queue. I want to use this for dumping binary data which I will read from a different thread and write it to files.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.