Giter Club home page Giter Club logo

smoothmq's Introduction

SmoothMQ

SmoothMQ is a drop-in replacement for SQS with a much smoother developer experience. It has a functional UI, observability, tracing, message scheduling, and rate-limiting. SmoothMQ lets you run a private SQS instance on any cloud.

Survey!

I'd love your feedback on the direction of this project! https://forms.gle/m5iMjcA5Xvp685Yw8

Getting Started

SmoothMQ deploys as a single go binary and can be used by any existing SQS client.

Running

This will run a UI on :3000 and an SQS-compatible server on :3001.

$ go run . server

Connecting

This works with any SQS client in any language.

Python

import boto3

# Simply change the endpoint_url
sqs = boto3.client("sqs", ..., endpoint_url="http://localhost:3001")
sqs.send_message(QueueUrl="...", MessageBody="hello world")

Celery works seamlessly:

app = Celery("tasks", broker_url="sqs://...@localhost:3001")

UI

The UI lets you manage queues and search individual messages.

Dashboard UI

smoothmq's People

Contributors

adilkhash avatar chasesc avatar kentshikama avatar poundifdef avatar sundbry avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

smoothmq's Issues

Does not work for aws js client

I've forked your repo so things are a little different since i'm using nginx to port forward from 80 to 3001. But otherwise should be mostly compatible:

const AWS = require('aws-sdk');
const dotenv = require('dotenv');
const { promisify } = require('util');
const path = require('path');

AWS.config.logger = console;

// .env file is located in the root of the project
const envFile = path.join(__dirname, '../../.env');

// dotenv.config();
dotenv.config({ path: envFile });

// assert that the AWS_SECRET_ACCESS_KEY is set
if (!process.env.AWS_SECRET_ACCESS_KEY) {
    console.error('AWS_SECRET_ACCESS_KEY is not set');
    process.exit(1);
} else {
    // print out the contents of the env file
    const parsed = dotenv.parse(require('fs').readFileSync(envFile, 'utf-8'));
    console.log("\nContents of .env file:");
    Object.entries(parsed).forEach(([key, value]) => {
        console.log(`${key}=${value}`);
    });
    console.log("");
}

const sleep = promisify(setTimeout);

async function createOrGetQueue(sqs, queueName) {
    try {
        const response = await sqs.listQueues({ QueueNamePrefix: queueName }).promise();
        if (response.QueueUrls && response.QueueUrls.length > 0) {
            const queueUrl = response.QueueUrls[0];
            console.log(`Queue already exists: ${queueUrl}`);
            return [queueUrl, false];
        } else {
            const response = await sqs.createQueue({ QueueName: queueName }).promise();
            const queueUrl = response.QueueUrl;
            console.log(`Created queue: ${queueUrl}`);
            return [queueUrl, true];
        }
    } catch (error) {
        console.error('Error in createOrGetQueue:', error);
        throw error;
    }
}

async function runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId) {
    const params = {
        apiVersion: "2012-11-05",
        region: 'us-east-1',
        accessKeyId: awsAccessId,
        secretAccessKey: awsSecretAccessKey,
        endpoint: new AWS.Endpoint(endpointUrl),
        signatureVersion: 'v4',
    }
    console.log('Creating SQS client with params:', params)
    const sqs = new AWS.SQS(params);

    const queueName = 'my-test-que-for-testing';
    let queueUrl, queueCreated;

    try {
        [queueUrl, queueCreated] = await createOrGetQueue(sqs, queueName);
        console.log(`Queue URL: ${queueUrl}`);

        await sqs.sendMessage({ QueueUrl: queueUrl, MessageBody: 'hello world' }).promise();
        console.log('Sent a message to the queue');

        const receiveResponse = await sqs.receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 1 }).promise();
        if (receiveResponse.Messages && receiveResponse.Messages.length > 0) {
            const message = receiveResponse.Messages[0];
            console.log(`Received message: ${message.Body}`);

            await sqs.deleteMessage({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }).promise();
            console.log('Deleted the message');
        } else {
            console.log('No messages in the queue');
        }

        await sleep(2000);
    } catch (error) {
        console.error('Error in runSqsTest:', error);
    } finally {
        if (queueUrl) {
            console.log(`Destroying queue: ${queueUrl}`);
            await sqs.deleteQueue({ QueueUrl: queueUrl }).promise();
            console.log(`Destroyed queue: ${queueUrl}`);
        }
    }
}

async function main() {
    const awsSecretAccessKey = process.env.AWS_SECRET_ACCESS_KEY;
    const awsAccessId = process.env.AWS_ACCESS_KEY_ID;
    //const endpoints = ['http://localhost', 'https://jobs.kumquat.live'];
    const endpoints = ['http://localhost'];

    for (const endpointUrl of endpoints) {
        console.log(`\nTesting with endpoint: ${endpointUrl}`);
        await runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId);
    }
}

main().catch(error => console.error('Error in main:', error));

What's weird is that I can create a queue, but not send a message to it.

ListQueues: Does not support QueueNamePrefix

The QueueNamePrefix of the ListQueues rpc is not implemented. This lead to some surprising bugs in my application, as I used ListQueues to detect if a named queue exists or not!

SQS method AmazonSQS.GetQueueUrl not implemented

When trying an existing application that uses the nodejs @aws-sdk/client-sqs library to connect to sqs, I get the following error:

SQS method AmazonSQS.GetQueueUrl not implemented

I suppose that is a part of the sqs api that SmoothMQ doesn't implement (yet)?

Creating a duplicate queue should give a proper error code

When creating a duplicate queue name, the server responds with a 500 status code. It should instead provide a 4xx class error / code compatible with the SQS API, so that clients can easily distinguish between failure to create a queue because of duplication, and actual internal server errors / unavailability.

In this case, I believe InvalidParameterValue is the correct error code.

Java SDK exception:

com.amazonaws.services.sqs.model.AmazonSQSException: UNIQUE constraint failed: queues.tenant_id, queues.name (Service: AmazonSQS; Status Code: 500; Error Code: InternalFailure; Request ID: nu
ll; Proxy: null)

Queue names are not case sensitive

According to the SQS documentation, queue names should be case sensitive. However, during testing it appears they are all converted to lowercase.

CreateQueue

QueueName
The name of the new queue. The following limits apply to this name:
A queue name can have up to 80 characters.
Valid values: alphanumeric characters, hyphens (-), and underscores (_).
A FIFO queue name must end with the .fifo suffix.
Queue URLs and names are case-sensitive.

Celery workers do not consume tasks with HTTPS on MacOS

When running SmoothMQ with HTTPS, we can add tasks with Celery but workers do not pick them up. This is only on MacOS - everything works as expected on Linux.

Here's a code example:

The Problem

from celery import Celery

app = Celery(
    "tasks",
    broker_url='sqs://7399377469860198052:27018ec9cb39d6d4d09b0f0378d3991c@7399377469860198052.demo.smoothmq.com:3001',
    broker_transport_options={'is_secure': True}
)

@app.task
def hello():
    return 'hello world'

if __name__ == '__main__':
        hello.delay()

This will successfully place the task on a queue, which you can see on this dashbaord. However, the worker will not pick up tasks:

celery -A tasks worker -l DEBUG --concurrency 1

It will successfully create queues, list queues, and prepare to call ReceiveMessage, but then loops infinitely:

[2024-08-04 16:07:29,165: DEBUG/MainProcess] Event choose-signer.sqs.ReceiveMessage: calling handler <function set_operation_specific_signer at 0x105778360>
[2024-08-04 16:07:29,166: DEBUG/MainProcess] Calculating signature using v4 auth.
[2024-08-04 16:07:29,166: DEBUG/MainProcess] CanonicalRequest:
POST
/

content-type:application/x-amz-json-1.0
host:7399377469860198052.demo.smoothmq.com:3001
x-amz-date:20240804T200729Z
x-amz-target:AmazonSQS.ReceiveMessage

content-type;host;x-amz-date;x-amz-target
1a762bf8b84958772e686dd62bcc0d4c11957890e258359aaf2d6c9f894d2c12
[2024-08-04 16:07:29,166: DEBUG/MainProcess] StringToSign:
AWS4-HMAC-SHA256
20240804T200729Z
20240804/us-east-1/sqs/aws4_request
3d33e059354446a823a67b47de5ad55fdb7ac7326d99013d3c147ceb4c84a6cf
[2024-08-04 16:07:29,166: DEBUG/MainProcess] Signature:
10cb9d37b40482dc470e6b17b556f7373a43399d6b3e39d65b0494d0192b5904

Interestingly, the following scenarios do work:

Running SmoothMQ locally

This will successfully connect and consume messages - non-HTTPS:

$ go run . server
app = Celery(
    "tasks",
    broker_url='sqs://DEV_ACCESS_KEY_ID:DEV_SECRET_ACCESS_KEY@localhost:3001',
    broker_transport_options={'is_secure': False}
)

Kombu without Celery

This will also successfully consume messages, even over HTTPS:

from kombu import Connection

with Connection(f'sqs://7399377469860198052:27018ec9cb39d6d4d09b0f0378d3991c@7399377469860198052.demo.smoothmq.com:3001', transport_options={'is_secure':True}) as conn:
    simple_queue = conn.SimpleQueue('q1')

    simple_queue.put('simple queue message')

    message = simple_queue.get(block=True, timeout=1)
    print(f'Received: {message.payload}')
    message.ack()
    simple_queue.close()

Invalid signature

Keep getting invalid signature error when trying to send a message to a queue. Tried using fake aws credentials, and tried using the request library instead of boto3. Keep getting same invalid signature error. Appreciate any help!

Enhance Docker/Kubernetes Compatibility: Consider Moving Database to Subdirectory

I've been loving the project and I'm looking to deploy it in a Docker/Kubernetes environment. I had a thought that might make this process smoother for others too:

Would it be possible to move the database to a subdirectory within the project structure? This small change could greatly simplify deploying smoothMQ in containerized environments like Docker and Kubernetes.

Create a release

Hey, I'm coming from HN.

To help the community to test your tool, could you create a release on github or a docker image.

Good luck with this project.

Improve speed

Nice work. Coming here via HN.

There have been a few examples on HN on how to dramatically speed up sqlite access with a few configuration tweaks. I've encapsulated these in https://pkg.go.dev/github.com/nicois/fastdb . Looking at your code, I think you would support significantly greater throughput under heavy load when using this approach, splitting our your SELECT calls from mutating database calls.

I can make a PR if you like, using the above module, or you might choose to implement these few settings yourself directly. The key settings can be seen here: https://github.com/nicois/fastdb/blob/main/main.go#L97

SQS method AmazonSQS.SendMessageBatch not implemented

Hi there,

Trying out your library it looks really promising thanks.

I'm using this library (https://github.com/bbc/sqs-producer) but can't send messages & get the following error SQS method AmazonSQS.SendMessageBatch not implemented.

2:59PM WRN zerolog.go:84 > Client error error="UnsupportedOperation: SQS method AmazonSQS.SendMessageBatch not implemented" ip=192.168.214.4 latency="276.969µs" method=POST status=400 url=/

Would be grateful if you could add this method.

Marketing tip: Show example monthly prices when hosting this

Hey! This looks really cool! Here's just a random quick thought I had:

If you were to look up the cheapest hosting solution for this (presumably a VPS?) on various tiers and benchmark the maximum throughputs and concurrencies, that would really help stakeholders compare the pricing of this compared to AWS, which could help corporate adoption.

ChangeMessageVisibility not implemented

Stack trace from the Java AWS client when calling ChageMessageVisibility:

com.amazonaws.services.sqs.model.UnsupportedOperationException: SQS method AmazonSQS.ChangeMessageVisibility not implemented (Service: AmazonSQS; Status Code: 400; Error Code: UnsupportedOper
ation; Request ID: null; Proxy: null)

This would be good to have for completeness of the SQS protocol. I am using this as a "negative acknowledgement" for a message handler.

Please add tests, also check out my fork which greatly improves this repo and has a python/js test suite

This is my fork of your repo.

https://github.com/zackees/SmoothMQ

While your repo kinda works but is not really tested and doesn't work on Render/DigitalOcean, mine does. Additionally, I have added nginx and added auth so that the codebase can be put on the public web. I've also implemented persistent storage of the sqlite data at /var/data so a disk/volume can be attached so that the queue can survive reboots/redeploys

Here is a change list:

  • port 3000 -> localhost:80/ui and is protected with basic http authorization.
    • user: user
    • password: <AMAZON_SECRET_KEY>
  • port 3001 -> localhost:80/ and is protected with the amazon secret key
  • Can be dropped into Render.com / Digital Ocean
    • If you allocate a disk at /var/data then the SQS queue will be persistant across reboots/deploys
  • This code base has a test suite
    • javascript
    • python
  • Deleting of Queues is implemented

Also note that I created a minimal AWS sqs wrapper so that legacy js clients can use this package:

// const AWS = require('aws-sdk');  // this is replaced by sqs-legacy-adapter
const AWS = require('sqs-legacy-adapter');

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.