filipdulic / bus-queue Goto Github PK
View Code? Open in Web Editor NEWLock free bounded non blocking pub sub queue
Lock free bounded non blocking pub sub queue
Depends on #36
This flavor will use a single arc-swap slot for the transmission of items and should be used only in extreme cases where the highest importance is given to the reception of the latest data. This flavor will be optimized for this use case, but care should be taken in use as frequent data drops are to be expected.
The current implementation of the Trait Stream for Subscriber returns only one item from the buffer. Make an alternative version, perhaps called VectoredSubscriber, whose stream would always return a vector of items filled will all the items from the channel until the try_recv method returns TryRecvError::Empty.
This would optimize Task generation, and Task wake-ups in async operations.
I previously (0.4.1) used the sync API, where I could just do a let msg = rx.try_recv().unwrap();
and the unwrap
would only fail if the other end disconnected.
My use case is that I have a producer thread and several consumer threads that are just waiting for messages.
How is the new API intended to be used? Currently I do something like the following:
loop {
let msg = match self.rx.try_recv() {
Ok(msg) => msg,
Err(channel::TryRecvError::Empty) => continue,
Err(channel::TryRecvError::Disconnected) => panic!("Sender disconnected"),
};
}
On the receiver which seems to work, but uses full CPU, so probably I should add a sleep in the Empty
branch:
loop {
let msg = match self.rx.try_recv() {
Ok(msg) => msg,
Err(channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1));
continue;
},
Err(channel::TryRecvError::Disconnected) => panic!("Sender disconnected"),
};
}
Is this how the API should be used? Or should I use a different crate for my use case?
test_subscriber_item_drop_related_to_ratio_of_timing test is currently commented out, as it is failing sporadically. Investigate and fix.
Currently you can change the default exports by selecting *-export features. This is unnecessary, cumbersome and does not allow for calling the --all-features cargo/ci flag as exports are exclusive.
Since all flavors are tested in their SwapSlot implementations, there is no need to run the full test suite on each one.
fix up gihub workflows ci to use --all-features instead of separate iterations for different flavors.
The bare example can be transformed into a property test where an arbitrary vector will be passed on input, along with the length of the buffer. The result should be a function that validates if the consumed vector is equal to the last N elements of the input vector, where N is the length of the bus.
bus
--> async_bus
channel
--> bus
Sender
--> Publisher
Receiver
--> Subscriber
Publisher
--> AsyncPublisher
Subscriber
--> AsyncSubscriber
Channel
--> RingBuffer
raw_bounded
--> bounded
bounded
--> async_bounded
The writer and reader indices are of the type usize which is a 32 or 64 bit unsigned integer depending on the target's address space. The edge case where the writer index has overflown pass the usize::MAX value is not covered by the internal queue logic.
For this to be an issue on 64 bit targets more then 18,446,744,073,709,551,615 items need to have been written to the queue.
It would take more then 500 years for this to happen if a value is written to the queue every 1ns.
The internal Channel struct is used in crosbeam-channel and async-std. Refactor the code to provide a similar implementation. Also use the flavors logic to enable multiple versions of the inner Channel.
etc...
Major refactor:
Implement a special version of Channel that uses a single SwapSlot for it's buffer. This is a special case where the Receivers always want the latest data when an item is dropped.
While there would be a performance increase for this specialized use case, as he try_recv method would be much simpler, there would be a penalty in that more items would be dropped, because the Sender will overwrite the old one on every broadcast call.
This should only be used in systems where even the smallest performance benefits outweigh potential loss of data. For all other use cases a much more stable option would be to use a buffer of size, and set the skip_items Receiver field to size - 1.
AtomicArc is an alternative to the ArcSwap library the bus-queue uses. Provide an alternative that can be tested and bench-marked.
AtomicArc resides in the atomic repo, but is currently unpublished on crates.io, copy the content of the files, with appropriate considerations to the author.
Since all the SwapSlot implementations in flavers are covered by tests, remove the Slot export and it's test in swap_slot.rs.
This flavor will implement Channel where RwLocks will be used instead of ArcSwap. The intention behind this is to provide a non lock-free version that focuses on stability and data-race avoidance. While the lock-free version is implemented to the best of our abilities, the sheer complexity of the lock-free domain requires formal verification where critical systems depend on this library, this will provide an intern solution, using the same interfaces, until such a time that a formal verification of the lock-free version is provided.
Try using clippy to help find small improvements to clarity and performance. As well as the code issues pointed out by KrishnaSannasi.
Current is_empty implementation on Receiver and Subscriber only check if wi == 0;
Should be implemented as ri == wi for Receivers and Subscriber. Perhaps add is_receiver_empty on Channel.
wi = 10, ri = 0, size = 10, item = y, buffer[0] = x.
y is returned even though x should have been, or sub should have moved to next.
Datarace resulting with the reader reading the most recent value added to the queue, instead of the oldest one, as well as a possible double read of the same value.
The example below will demonstrate how the datarace will happen.
The scenario where this happens begins with the writer overtaking the reader index by the size of the queue, in this example the queue size is 3 and the writer has written 3 values (0,1,2) into the queue, and incremented it's writer index to 3.
+--------+-----------+
| size | 3 |
+--------+---+---+---+
| index | 0 | 1 | 2 |
+--------+---+---+---+
| values | 0 | 1 | 2 |
+--------+---+---+---+
| wi | 3 | | |
+--------+---+---+---+
| ri | 0 | | |
+--------+---+---+---+
If while the writer inserts the 4th value (3) into the queue, but before it increments it's index (wi), the Reciever's try_recv method is called.
+--------+-----------+
| size | 3 |
+--------+---+---+---+
| index | 0 | 1 | 2 |
+--------+---+---+---+
| values | 3 | 1 | 2 |
+--------+---+---+---+
| wi | 3 | | |
+--------+---+---+---+
| ri | 0 | | |
+--------+---+---+---+
All of the try_recv conditional statements will pass, and the value 3 would be returned. This will result in the reader reading a 3, while expecting 0. If the reader continues to read it will eventually read the 3 value again.
Some of the exported structs, traits and functions require long export paths making the library and documentation usage cumbersome.
I've been playing around with the library and it's really nice.
The motivation for this issue is that primary structs that I'm passing through the bus are Bytes (https://docs.rs/bytes/0.4.12/bytes/struct.Bytes.html) and tuples of Enums and bytes. Bytes already have the main body of memory enclosed in an Arc.
This is a little uncomfortable because I essentially have a Arc<Arc<..>> for no real reason now.
I tried to switch out Send for Clone but came across the fact that the buffer is updated by use of ArcSwap. Replacing ArcSwap for a RwLock hurts performance a bit (predictably).
The other alternative here for me would be to swap the Arc for ArcSwap in the bytes library, but this is a bit of a momentous task.
Is there any advice for some sort of work around here? Thanks
fix up SwapSlot::load() implementations for all flavors to cover None option, use it in flavor tests by expecting panic.
let result = std::panic::catch_unwind(|| SwapSlot::load(&item));
assert!(result.is_err());
Implement the Eq Trait on all versions of Publisher and Subscriber struts.
I have failed to use this module with rust 2018:
error: expected identifier, found reserved keyword `async`
--> engine/src/engine.rs:7:16
|
7 | use bus_queue::async::{channel, Publisher, Subscriber};
| ^^^^^ expected identifier, found reserved keyword
Because async
now is reserved keyword in Rust, so the async
submodule should be renamed.
As a quick solution I suggest just add single underscore at end (i.e. async_
).
Current implementation v0.3.10 does not provide sync trait.
Ordering is very important for certain operations in broadcast, and try_recv. Investigate weather the std::sync::atomic::fence needs to be added to these functions where appropriate.
Example async-simple fails assertion, vector [1,2,3,4,5] is being sent, while it's being checked against [1,2,3,3,5]. Reported by OptimisticPeach.
Currently we have ambiguity when calling a SwapSlot
implementation, e.g. ArcSwapOption
already has load
and store
methods.
Implement a wrapper struct for each flavor, something in terms of:
pub struct Slot<T>(ArcSwapOption<T>);
...
let item = Slot::<i32>::none();
item.store(1);
let arc = item.load();
The author of piper publisher the Event and EventListener struct in a new event-listener crate. Remove the piper source files and replace them by importing event-listener;
Every ordering used is Ordering::Relaxed, which means sending a message from one thread and receiving it from another is a data race. At the very least, Ordering::Acquire and Ordering::Release should be used somewhere. - reported by stjepang
For the implementation of flavors the SwapSlot Trait will be used.
This is a syncing primitive that the Channel will use to swap in and load atomically counted references into and from the buffer;
SwapSlot requires three methods:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.