Giter Club home page Giter Club logo

turbocharged.beanstalk's Introduction

Turbocharged.Beanstalk

NuGet

A Beanstalk .NET client library filled with async happiness.

Don't like async? That's cool, no problem. You might like libBeanstalk.NET instead.

Usage

Do the normal thing:

PM> Install-Package Turbocharged.Beanstalk

Because of the way the Beanstalk protocol works, it's important that producers and consumers use separate connections. So, when creating a BeanstalkConnection, you need to choose whether it's a consumer or producer.

Producing Jobs

Create a Producer if you need to insert jobs. Most producer methods are affected by UseAsync(tube).

IProducer producer = await BeanstalkConnection.ConnectProducerAsync("localhost:11300");
await producer.UseAsync("mytube");

Beanstalk jobs are just blobs, so jobs are represented as byte arrays.

byte[] job = new byte[] { 102, 105, 101, 116, 123, 124, 101, 114, 113 };
await producer.PutAsync(job, 5, TimeSpan.Zero, TimeSpan.FromSeconds(30));

Not feeling the love for byte arrays? You can also put custom objects and they'll be serialized. The default is JSON.

await producer.PutAsync<MyObject>(obj, 5, TimeSpan.Zero, TimeSpan.FromSeconds(30));

Since Beanstalk maintains a TCP connection, you need to clean up your toys when you're done:

producer.Dispose();

Consuming jobs

If you need to consume jobs, create a Consumer instead. Most consumer methods are affected by WatchAsync(tube) and IgnoreAsync(tube).

IConsumer consumer = await BeanstalkConnection.ConnectConsumerAsync("localhost:11300");
await consumer.WatchAsync("mytube");

To ask Beanstalk for a job, reserve it:

Job job = await consumer.ReserveAsync();
// or: 
Job job = await consumer.ReserveAsync(timeout: TimeSpan.FromSeconds(10));

Console.WriteLine("Reserved job ID = {0}, Length = ", job.Id, job.Data.Length);

You can also deserialize if you know what type you're expecting.

Job<MyObject> job = await consumer.ReserveAsync<MyObject>();

When you're done with your job, ask your consumer to delete it or bury it.

if (success)
    await consumer.DeleteAsync(job.Id);
else
    await consumer.BuryAsync(job.Id, priority: 5);

Again, clean up after yourself when you don't need the connection anymore:

consumer.Dispose();

Creating a worker task

A worker task is a BeanstalkConnection that processes jobs in a loop.

  1. You provide a delegate with signature Func<IWorker, Job, Task>. Turbocharged.Beanstalk immediately connects and called "reserve" for you.

  2. Your delegate gets called whenever a job is reserved.

  3. Call DeleteAsync or BuryAsync when you're finished.

It looks like this:

private Task MyWorkerFunc(IWorker worker, Job job)
{
    bool success = ProcessJob(job.Data);
    if (success)
        await worker.DeleteAsync(job.Id);
    else
        await worker.BuryAsync(job.Id, 1);
}

IDisposable worker = BeanstalkConnection.ConnectWorkerAsync(hostname, port, MyWorkerFunc);

You can also use serialized messages:

private Task MyTypedWorkerFunc(IWorker worker, Job<MyObject> job)
{
    bool success = ProcessJob(job.Object);
    if (success)
        await worker.DeleteAsync(job.Id);
    else
        await worker.BuryAsync(job.Id, 1);
}

IDisposable worker = BeanstalkConnection.ConnectWorkerAsync<MyObject>(hostname, port, MyTypedWorkerFunc);

As usual, dispose the worker to make it stop.

worker.Dispose();

Goals

  • Simple API that encourages ease of use
  • Teach myself how to properly use the shiny asynchrony features in C# 5.0.

License

The MIT License. See LICENSE.md.

turbocharged.beanstalk's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

turbocharged.beanstalk's Issues

Worker stops monitoring of tube after a period of time (~2 hours)

Hi

First off, I should probably mention that I'm fairly new to C# .Net having worked with it for about 11 months now, so forgive me if this is more a case of ignorance than an actual issue. I just wasn't sure where else I could post questions and I can't seem to find anything about this in the documentation.

I have built a long running tray application that has a worker which watches a queue on a central server and processes any incoming jobs that arrive in that queue. It works great, however, after a while (my shortest test was 2 hours) I notice that it no longer processes any new jobs and the jobs just sit there in the queue.

I realize that .Net has a built in garbage collector and I have put GC.KeepAlive(queueWorker); at the end of my Main program class with the hopes that GC will ignore my worker object. I also realize that the type the ConnectWorkerAsync method returns is of Task(IDisposable). This suggests to me why my KeepAlive attempt may not be working.

Could you provide any advice as to how I can ensure that my worker continues for the full lifespan of the application while it's process is running?

Your advice would be most appreciated.

Rethink ReserveAsync() and friends

Beanstalk processes requests one at a time. Every Beanstalk command returns immediately except reserve and reserve-with-timeout, both of which block the processing pipeline. A consumer who calls ReserveAsync() therefore blocks any other calls on that connection until a job is reserved or the timeout elapses.

Since ReserveAsync() is the only method that blocks the connection, it might make sense for reservations to require opening a dedicated connection. That one connection blocks, but the initial connection remains free for use.

Reading Message Stream Fails To Read Expected Number Of Bytes

Hi, we have bumped into a couple of scenarios where the ReserveRequest.TryGetJobFromBuffer fails to read messages because the presenting NetworkStream has not not yet 'streamed' all the bytes that were indicated in the message header, the bytes shortfall causes an exception to be thrown.

_tcs.SetException(new Exception("Unable to parse job description"));

The problem can be fixed with an iterative approach making a number of calls to read the expected number of bytes in chunks.

Up to this point we have run millions of messages through the tubes using a simple reserve/process/delete model. It was only when we began to explore multiple reserves circa 10 messages on messages bodies of around 25K in size.

The same issues may surface in the PeekRequest and several other places where the stream.Read assumes that all bytes are ready and waiting within the stream.

Is the repo still active?
Thanks in advance

Manage a pool of connections

Currently the BeanstalkConnection object maintains a single connection to Beanstalk. That means it can process messages one at a time, with no cancellation. For example, a "reserve" command blocks all other commands until a job is reserved or the timeout expires (this is how Beanstalk appears to work under the hood).

There could be a new top-level BeanstalkPool object which opens new TCP connections to Beanstalk as necessary. Considerations:

  • If different users of the pool want to use/watch different pools, does every combination need to get its own connection? One connection can't use multiple tubes, but it would be possible to always preceed a command with a use command for the tube it was meant for. The equivalent of alawys doing this:

    await prod.UseAsync("my-tube");
    await prod.PutAsync(...);
    await prod.UseAsync("my-tube");
    await prod.PutAsync(...);
  • Consumers need to be isolated from producers, or else a "reserve" followed by a "put" could deadlock the connection.

Built-in serialization of objects rather than byte arrays

I imagine most .NET consumers actually want to store objects, not byte arrays, in Beanstalk. We could expose PutAsync<T> and ReserveAsync<T> methods that serialize the objects:

string str = "hello";
await producer.PutAsync<string>(str, 1, 0, 1);
var result = await consumer.ReserveAsync<string>();
// result.Object == "hello"

Questions:

  • What if we can't deserialize the object, if somebody put something weird in the tube? Do we bury the message? Return it as a Job but with a null .Object property? Throw an exception?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.