Giter Club home page Giter Club logo

durable-queue's Introduction

Clojars Project cljdoc badge CircleCI

This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size is bounded by available disk rather than memory. It is a small, purely-Clojure implementation focused entirely on the in-process use case, meaning that it is both simpler and more easily embedded than network-aware queue implementations such as Kafka and ActiveMQ.

usage

[factual/durable-queue "0.1.5"]

To interact with queues, first create a queues object by specifying a directory in the filesystem and an options map:

> (require '[durable-queue :refer :all])
nil
> (def q (queues "/tmp" {}))
#'q

This allows us to put! and take! tasks from named queues. take! is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses:

> (take! q :foo 10 :timed-out!)
:timed-out!
> (put! q :foo "a task")
true
> (take! q :foo)
< :in-progress | "a task" >
> (deref *1)
"a task"

Notice that the task has a value describing its progress, and a value describing the task itself. We can get the task descriptor by dereferencing the returned task. Note that since the task is persisted to disk and anything on disk may be corrupted, this involves a checksum which may fail and throw an IOException. Any software which wants to be robust to all failure modes should always dereference within a try/catch clause.

Calling take! removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as complete!.

> (put! q :foo "another task")
true
> (take! q :foo)
< :in-progress | "another task" >
> (complete! *1)
true

If our task fails and we want to re-enqueue it to be tried again, we can instead call (retry! task). Tasks which are marked for retry are added to the end of the current queue.

To get a description of the current state of the queue, we can use stats, which returns a map of queue names onto various counts:

> (stats q)
{:enqueued 2,
 :retried 0,
 :completed 1,
 :in-progress 1,
 :num-slabs 1,
 :num-active-slabs 1}
field description
:enqueued the number of tasks which have been enqueued via put!, including any pre-existing tasks on-disk when the queues were initialized
:retried the number of tasks which have been retried via retry!
:completed the number of tasks which have been completed via complete!
:in-progress the number of tasks which have been consumed via take!, but are not yet complete
:num-slabs the number of underlying files which are being used to store tasks
:num-active-slabs the number of underlying files which are currently open and mapped into memory

configuring the queues

queues can be given a number of different options, which can affect its performance and correctness.

By default, it is assumed all tasks are idempotent. This is necessary, since the process can die at any time and leave an in-progress task in an undefined state. If your tasks are not idempotent, a :complete? predicate can be defined which, on instantiation of the queues object, will scan through all pre-existing task descriptors and remove those for which the predicate returns true.

A complete list of options is as follows:

name description
:complete? a predicate for identifying already completed tasks, defaults to always returning false
:max-queue-size the maximum number of elements that can be in the queue before put! blocks, defaults to Integer/MAX_VALUE
:slab-size The size, in bytes, of the backing files for the queue. The size of a serialized task cannot be larger than this size, defaults to 64mb.
:fsync-put? Whether an fsync should be performed for each put!. Defaults to true.
:fsync-take? Whether an fsync should be performed for each take!. Defaults to false.
:fsync-threshold The maximum number of writes (puts, takes, retries, completes) that can be performed before an fsync is performed.
:fsync-interval The maximum amount of time, in milliseconds, that can elapse before an fsync is performed.

Disabling :fsync-put? will risk losing tasks if a process dies. Disabling :fsync-take? increases the chance of a task being re-run when a process dies. Disabling both will increase throughput of the queue by at least an order of magnitude (in the default configuration, ~1.5k tasks/sec on rotating disks and ~6k tasks/sec on SSD, with fsync completely disabled ~100k tasks/sec independent of hardware).

Writes can be batched using fsync-threshold and/or fsync-interval, or by explicitly calling (durable-queue/fsync q). Setting the fsync-threshold to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set.

license

Copyright © 2015 Factual Inc

Distributed under the Eclipse Public License 1.0

durable-queue's People

Contributors

drbobbeaty avatar jaju avatar seancorfield avatar shepmaster avatar slipset avatar ztellman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

durable-queue's Issues

When can a statistic be negative?

[Question]
Here's one I see in my process.
"out" {:enqueued 8332, :retried 0, :completed 8333, :in-progress -1}

I'm unsure how to handle this, as I depend on the values to decide how to progress (completion marking etc.)

Thanks!

on put! java.lang.NoSuchMethodError: java.util.zip.Checksum.update([B)V

I assume this is a version conflict, either of Java or Clojure or durable-queue. My code works fine on my local Macbook Pro, but I created a new EC2 instance with Ubuntu 18.04 and then I added Java:

java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-0ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)

On the EC2 instance I get this error on put!

INFO: java.lang.NoSuchMethodError: java.util.zip.Checksum.update([B)V
  java.lang.NoSuchMethodError: java.util.zip.Checksum.update([B)V
 at durable_queue$checksum.invokeStatic (durable_queue.clj:64)
    durable_queue$checksum.invokePrim (durable_queue.clj:-1)
    durable_queue.TaskSlab.append_to_slab_BANG_ (durable_queue.clj:314)
    durable_queue$queues$reify__6549$slab_BANG___6570.invoke (durable_queue.clj:702)
    durable_queue$queues$reify__6549$fn__6575.invoke (durable_queue.clj:719)
    durable_queue$queues$reify__6549.put_BANG_ (durable_queue.clj:717)
    durable_queue$queues$reify__6549.put_BANG_ (durable_queue.clj:732)
    humongorous_nlp.core$cycle_to_database$fn__13673$fn__13693.invoke (core.clj:665)
    humongorous_nlp.core$cycle_to_database$fn__13673.invoke (core.clj:663)
    clojure.lang.AFn.run (AFn.java:22)
    java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    java.util.concurrent.FutureTask.runAndReset (FutureTask.java:308)
    java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301 (ScheduledThreadPoolExecutor.java:180)
    java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:294)
    java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    java.lang.Thread.run (Thread.java:748)

Is this a matter of upgrading Java, or is this a matter of doing a data conversion before calling put! ?

I'm puzzled why this works on my Mac but not the EC2 instance.

Java 13 compatiblity

This library has been a workhorse across java updates. It looks like a bit of work will be required to maintain dependability with the release of java 13 which touched on many aspects of java.nio. The type-specific Buffer classes of java.nio now have absolute (as opposed to relative) bulk get and put methods.

java.nio.ByteBuffer and the other buffer types in java.nio now define absolute bulk get and put methods to transfer contiguous sequences of bytes without regard to or effect on the buffer position.

After upgrading to java 13, I started seeing the following error:

No matching method put found taking 2 args for class java.nio.DirectByteBuffer

Linking back to the source:
https://github.com/Factual/durable-queue/blob/83452cbcc4c15a2244747e77b54a96a23e027691/src/durable_queue.clj#L176

I hope it's a small thing to fix.

Deletes files that look like slabs

I had another process writing temporary files with names matching #"\w+_\d+" (2015_05_01_xxxxxx.zip) to the same ./tmp directory where durable-queue stored its slabs. After a while, I noticed those temporary files occasionally disappear before the worker process would get a chance to clean them up. Turned out, durable-queue was deleting them, thinking they were empty slabs.

While, I suppose, I shouldn’t have taken the /tmp example in README so seriously, at least it seems worth noting in the docs that the directory is better be dedicated to the process. A stricter regexp (i.e. #"^\w+_\d+$") could also help reduce the risk of collisions.

Permissions Issues on /tmp/* Directories

I've noticed an annoying issue that seems like it might be avoidable.

Let's say that I'm using durable-queue in a project:

(defonce q (queues "/tmp" {}))

and then you're off and having fun and it's working great, and your MacBook Pro connects to some WiFi hotspot at home - or work, and macOS creates a bunch of temp files:

/tmp/wifi-01-25-2017_12:34:56.log

and when the code starts, there are permission issues on these files, and the code exceptions. When I delete these log files, everything is fine.

My question is - can these files in /tmp that aren't durable queue data files - be excluded from the search on startup?

Thread safety of take!, put!

Hi,

Should we consider put! and take! threadsafe?

I have one thread doing puts in bursts at regular interval, and N workers on their own respective threads consuming the queue. Sometimes I get into a situation where I have 1 in progress task and all other workers waiting on tasks while the queue keeps growing (essentially deadlocked at this point). In the code I take!, deref and mark completed tasks immediately (did that to exclude a bug on complete!).
Should I serialize the takes or is this a potential bug? I ll try to come up with a repro at work tomorrow.

I also get the occasional negative in-progress while testing:

09:22:14.938 [clojure-agent-send-off-pool-0] INFO sink.testdq - {foo {:num-slabs 1, :num-active-slabs 1, :enqueued 10947, :retried 0, :completed 10947, :in-progress -1}}

Thanks for the lib by the way, it s super useful.

Slabs can grow larger than max-queue-size

In the put! function, slab! is called before queue!, writing the data to the slab before offering to the queue.

               (if-let [val (locking q
                              (queue!
                                (vary-meta (slab!) assoc
                                  ::this this-ref
                                  ::queue-name q-name
                                  ::queue q
                                  ::fsync? fsync-take?)))]

The consequence is that the slab files can get larger than the max-queue-size, and upon application restart, exceptions get raised when the slabs are loaded and don't fit into the queue.

(IllegalArgumentException.
                             "'max-queue-size' insufficient to hold existing tasks."))

Dashes converted to underscores

Hey @ztellman. Thanks for the awesome library!

I noticed that queue names with dashes are automatically converted to underscores. Intentional?

(def dir "/tmp")
(def conn (d/queues dir {}))
(d/put! conn "a-b-c" 42)
(d/stats conn)
;; => {"a_b_c" {:in-progress 0, :completed 0, :retried 0, :enqueued 1, :num-slabs 1, :num-active-slabs 1}}

calling complete! on already completed task messes up stats

I'm not sure if this is as expected, but I found this behavior when I was trying stuff in REPL.

> (dq/put! q :test "test task")
> (def t (dq/take! q :test))
> (dq/complete! t)
true
> (get (dq/stats q) "test")
{:in-progress 0, :completed 1, :retried 0, :enqueued 1, :num-slabs 1, :num-active-slabs 1}
> (dq/complete! t)
true
> (get (dq/stats q) "test")
{:in-progress -1, :completed 2, :retried 0, :enqueued 1, :num-slabs 1, :num-active-slabs 1}

Is this expected behavior and the caller is responsible of keeping track of task status?
If so, is there possibility of making status function public, so that we can check the status of the task?(currently it's private)

Update to Clojure 1.8 - byte-streams 0.2.2

I'm starting to work with this library and Clojure 1.8.0, and as-is, this isn't working well - but if I preload the byte-streams v0.2.2 and exclude that from durable-queues, then it seems to work just fine.

I would suggest upgrading to [byte-streams "0.2.2"] in this project, and then it should be fine.

Use cases

Hi,

I've successfully implemented a durable queue according to the stated use case, as an in-process device that does both the puts and takes. However, initially I had a misgiving, and I had two processes, one doing the puts, the other the takes. This did not work. Despite the fact that the queue object was pointing to the same underlying file, the puts of one process did not result in tasks visible to takes in the second process. Now I am just curious, what are the technical reasons for this. Intuitively, I though that no further synchronization was needed beyond fsync to make this work.

Thank you in advance.

names with . characters do not save properly.

> (put! q "example.com" "foo")
true
> (stats q)
{"example.com" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}}
> (def q (queues "/tmp"))
> (stats q)
{"com" {:num-slabs 1, :num-active-slabs 0, :enqueued 1, :retried 0, :completed 0, :in-progress 0}}

From my testing, it appears all the characters before the last '.' in the queue name is lost when reloading the queue.

Less durable queue

Would it be possible to create a version of the queue that would not be durable for testing purposes? Maybe you could pass an optional parameter to set it to in memory on mode.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.