arthurprs / floki Goto Github PK
View Code? Open in Web Editor NEWFloki Message Queue
License: MIT License
Floki Message Queue
License: MIT License
I built and tried helloworld example on mac-book and both client/server are crashing.
Floki log:
RUST_BACKTRACE=1 RUST_LOG=floki=info cargo run
Running `target/debug/floki`
INFO:floki: starting up
INFO:floki::server: Opening queues...
INFO:floki::server: Opening queue "my_queue"
INFO:floki::queue_backend: [my_queue] checkpoint loaded: QueueBackendCheckpoint { segments: [SegmentCheckpoint { tail: 1, head: 125, sync_offset: 5584, closed: false }] }
WARN:floki::queue_backend: ["./data/my_queue/000000000000001.data"] expected id 125 got 0 when recovering @5584
INFO:floki::queue: [my_queue] checkpoint loaded: Ready
INFO:floki: starting event loop
INFO:floki::queue_backend: [my_queue] checkpointed: [SegmentCheckpoint { tail: 1, head: 125, sync_offset: 5584, closed: false }]
INFO:floki::queue: [my_queue] checkpointed: Ready
INFO:floki::queue_backend: [my_queue] checkpointed: [SegmentCheckpoint { tail: 1, head: 125, sync_offset: 5584, closed: false }]
INFO:floki::queue: [my_queue] checkpointed: Ready
INFO:floki::server: assigned token Token(1) to client V4(127.0.0.1:61418)
INFO:floki::server: creating queue "my_queue" channel "channel_1"
INFO:floki::server: creating queue "my_queue" channel "channel_2"
INFO:floki::server: assigned token Token(2) to client V4(127.0.0.1:61419)
INFO:floki::server: assigned token Token(3) to client V4(127.0.0.1:61420)
INFO:floki::server: assigned token Token(4) to client V4(127.0.0.1:61421)
INFO:floki::server: assigned token Token(5) to client V4(127.0.0.1:61422)
INFO:floki::server: closing token Token(2) client
thread '<main>' panicked at 'assertion failed: `(left == right)` (left: `Readable | Writable`, right: `Readable`)', src/server.rs:834
stack backtrace:
1: 0x10316c718 - sys::backtrace::tracing::imp::write::h32380eb2460086d2mWt
2: 0x10316e04f - panicking::log_panic::_<closure>::closure.41658
3: 0x10316dad1 - panicking::log_panic::h5bd130ecc1f616afyPx
4: 0x10315e4d6 - sys_common::unwind::begin_unwind_inner::hf3f0868d52ab85beoZs
5: 0x10315eafe - sys_common::unwind::begin_unwind_fmt::h1d52f0c1e48facd2uYs
6: 0x10304a589 - server::Server::ready::h6f4963e29b5ad0f8Pzd
7: 0x1030572fe - server::ServerHandler.Handler::ready::h203c0a070f4bffd2aNd
8: 0x102f448c3 - event_loop::EventLoop<H>::io_event::io_event::h17452341913126285399
9: 0x102f447d9 - event_loop::EventLoop<H>::io_process::io_process::h3125720144754587274
10: 0x102f440e8 - event_loop::EventLoop<H>::run_once::run_once::h16836368678178756876
11: 0x102f43af1 - event_loop::EventLoop<H>::run::run::h947473040700634305
12: 0x102f237fc - main::h54061b420d29a3340hg
13: 0x10316d832 - sys_common::unwind::try::try_fn::h4737994515490642073
14: 0x10316bb58 - __rust_try
15: 0x10316d6d9 - rt::lang_start::h79f614af887db3afGMx
16: 0x102f46cd9 - main
INFO:floki::queue_backend: [my_queue] checkpointed: [SegmentCheckpoint { tail: 1, head: 157, sync_offset: 7024, closed: false }]
INFO:floki::queue: [my_queue] checkpointed: Ready
Process didn't exit successfully: `target/debug/floki` (exit code: 101)
helloword example:
target/debug/examples/helloworld
channel_1 received msg: id 15 ticket 1831916728403976237 body "message 1 from producer 1"
channel_1 received msg: id 16 ticket 3075441350875930991 body "message 2 from producer 1"
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Connection reset by peer (os error 54)thread '', thread '<unnamed>../src/libcore/result.rs<unnamed>' panicked at ':' panicked at 'called `Result::unwrap()` on an `Err` value: Could not read enough bytesassertion failed: `(left == right)` (left: `Err(Could not read enough bytes)`, right: `Ok(2)`)688thread '', ',
<unnamed>../src/libcore/result.rsexamples/helloworld.rs' panicked at ':called `Result::unwrap()` on an `Err` value: Could not read enough bytes688',
../src/libcore/result.rs:688
:31
thread '<main>' panicked at 'called `Result::unwrap()` on an `Err` value: Any', ../src/libcore/result.rs:688
Amazon SQS has the concept of a receipt that's different from the Message ID, its used to delete (ack) the message afterwards. Also, if a messages timeout a new receipt is generated and the old one can't be used anymore.
I tried to mimic the concept here with a different name (ticket) but I'm unsure if it's actually useful. I use/used SQS extensively and I never found this actually useful.
Removing this features allow Floki to store a little bit less inflight metadata in memory.
I appreciate any inputs on this.
The current retention period implementation is a soft one. Garbage collection will still keep segments based on the smallest channel tail.
It'd be interesting to have a hard retention period that will forcefully remove stalled segments.
On 32-bit arm, using stable rust 1.5.0, I'm getting the following error:
Compiling floki v0.1.0 (file:///tmp/floki-master)
src/server.rs:109:23: 109:25 error: mismatched types:
expected `server::NotifyMessage`,
found `()`
(expected enum `server::NotifyMessage`,
found ()) [E0308]
src/server.rs:109 Ok(ok) => ok,
^~
src/server.rs:252:17: 252:96 note: in this expansion of try_or_error! (defined in src/server.rs)
src/server.rs:109:23: 109:25 help: run `rustc --explain E0308` to see a detailed explanation
src/server.rs:109:23: 109:25 error: mismatched types:
expected `server::NotifyMessage`,
found `()`
(expected enum `server::NotifyMessage`,
found ()) [E0308]
src/server.rs:109 Ok(ok) => ok,
^~
src/server.rs:256:17: 256:82 note: in this expansion of try_or_error! (defined in src/server.rs)
src/server.rs:109:23: 109:25 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:192:30: 192:38 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:192 ptr::null_mut(), file_len,
^~~~~~~~
src/queue_backend.rs:191:25: 194:34 note: in this expansion of try! (defined in <std macros>)
src/queue_backend.rs:192:30: 192:38 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:197:13: 197:21 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:197 file_len,
^~~~~~~~
src/queue_backend.rs:195:9: 198:37 note: in this expansion of try! (defined in <std macros>)
src/queue_backend.rs:197:13: 197:21 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:293:56: 293:74 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:293 mman::msync(self.file_mmap as *mut c_void, sync_offset as u64, mman::MS_SYNC).unwrap();
^~~~~~~~~~~~~~~~~~
src/queue_backend.rs:293:56: 293:74 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:308:13: 308:36 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:308 self.sync_offset as u64,
^~~~~~~~~~~~~~~~~~~~~~~
src/queue_backend.rs:306:9: 309:35 note: in this expansion of try! (defined in <std macros>)
src/queue_backend.rs:308:13: 308:36 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:384:29: 384:49 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:384 mman::madvise(mmap, self.file_len as u64, mman::MADV_DONTNEED).unwrap();
^~~~~~~~~~~~~~~~~~~~
src/queue_backend.rs:384:29: 384:49 help: run `rustc --explain E0308` to see a detailed explanation
src/queue_backend.rs:385:28: 385:48 error: mismatched types:
expected `u32`,
found `u64`
(expected u32,
found u64) [E0308]
src/queue_backend.rs:385 mman::munmap(mmap, self.file_len as u64).unwrap();
^~~~~~~~~~~~~~~~~~~~
src/queue_backend.rs:385:28: 385:48 help: run `rustc --explain E0308` to see a detailed explanation
error: aborting due to 8 previous errors
Could not compile `floki`.
One of my planned features is to allow user to do a channel "seek" using either a timestamp or an id. This is specially useful now that it has soft/hard retention periods.
All infrastructure is in place but someone needs to figure out the best REDIS command to map this with.
This is what Floki does right now
RPUSH push one or more messages
RPUSH queue_name message1 [message2, ...]
Returns the number of messages inserted
HMGET get one or more messages
HMGET queue_name channel_name number_or_messages [long_pooling_timeout]
Returns an array with three items for each message. The first would be the id, the second is the ticket (for acknowledging, see bellow) and the third would be the message itself.
Note: Floki will return as soon as there's one message available
HMSET seeks the channel
HMGET queue_name channel_name TS|ID seek_timestamp|seek_id
Seeks the specified channel to the specified id or timestamp.
Note: Floki won't error if the id or timestamp is either in the future or is already gone from the underlying storage. If it's set to a non-existent past, gets will just return the first available message.
HDEL ack messages
HDEL queue_name channel_name ticket1 [ticket2, ...]
Acknowledge messages using their tickets. Returns the number of successful acknowledges.
MSET create queue/channel
MSET queue_name channel_name
Creates the specified queue and channels, will error if the queue already exists
DEL delete queue/channel
DEL queue_name [channel_name]
Deletes a channel, if specified, otherwise deletes the queue.
SREM purge queue/channel
SREM queue_name channel_name_or_*
Note: you can also use * as the channel name to purge all channels, effectively purging the entire queue and allowing all used disk space to be reclaimed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.