Giter Club home page Giter Club logo

stompest's Introduction

NOTE: Stompest has a new home

Thanks to Jan Müller, stompest has been upgraded to support both STOMP 1.0 and 1.1, along with transactions, receipt handling, and more. Version 1.x will no longer be maintained.

Version 1.x docs

stomp, stomper, stompest!

Stompest is a no-nonsense STOMP 1.0 implementation for Python including both synchronous and Twisted clients.

Modeled after the Perl Net::Stomp module, the synchronous client is dead simple. It does not assume anything about your concurrency model (thread vs process) or force you to use it any particular way. It gets out of your way and lets you do what you want.

The Twisted client is a full-featured STOMP protocol client built on top of the stomper library. It supports destination-specific message handlers, concurrent message processing, graceful shutdown, "poison pill" error handling, and connection timeout.

This module is thoroughly unit tested and production hardened for the functionality used by Mozes: persistent queueing on ActiveMQ. Minor enhancements may be required to use this STOMP adapter with other brokers or certain feature sets like transactions.

Examples

Note

If you use ActiveMQ to run these examples, make sure you enable the Stomp connector in the ActiveMQ config file, activemq.xml.

<transportConnector name="stomp"  uri="stomp://0.0.0.0:61613"/>

See http://activemq.apache.org/stomp.html for details.

Simple Producer

from stompest.simple import Stomp

QUEUE = '/queue/simpleTest'

stomp = Stomp('localhost', 61613)
stomp.connect()
stomp.send(QUEUE, 'test message1')
stomp.send(QUEUE, 'test message2')
stomp.disconnect()

Simple Consumer

from stompest.simple import Stomp

QUEUE = '/queue/simpleTest'

stomp = Stomp('localhost', 61613)
stomp.connect()
stomp.subscribe(QUEUE, {'ack': 'client'})

while(True):
    frame = stomp.receiveFrame()
    print "Got message frame: %s" % frame
    stomp.ack(frame)

stomp.disconnect()

Twisted Producer

import logging
import simplejson
from twisted.internet import reactor, defer
from stompest.async import StompConfig, StompCreator

class Producer(object):

    QUEUE = '/queue/testIn'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('localhost', 61613)
        self.config = config
    
    @defer.inlineCallbacks
    def run(self):
        #Establish connection
        stomp = yield StompCreator(self.config).getConnection()
        #Enqueue 10 messages
        try:
            for x in range(10):
                stomp.send(self.QUEUE, simplejson.dumps({'count': x}))
        finally:
            #Give the reactor time to complete the writes
            reactor.callLater(1, reactor.stop)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    Producer().run()
    reactor.run()

Twisted Transformer

import logging
import simplejson
from twisted.internet import reactor, defer
from stompest.async import StompConfig, StompCreator

class IncrementTransformer(object):

    IN_QUEUE = '/queue/testIn'
    OUT_QUEUE = '/queue/testOut'
    ERROR_QUEUE = '/queue/testTransformerError'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('localhost', 61613)
        self.config = config
    
    @defer.inlineCallbacks
    def run(self):
        #Establish connection
        stomp = yield StompCreator(self.config).getConnection()
        #Subscribe to inbound queue
        headers = {
            #client-individual mode is only supported in AMQ >= 5.2 but necessary for concurrent processing
            'ack': 'client-individual',
            #this is the maximum messages the broker will let you work on at the same time
            'activemq.prefetchSize': 100, 
        }
        stomp.subscribe(self.IN_QUEUE, self.addOne, headers, errorDestination=self.ERROR_QUEUE)

    def addOne(self, stomp, frame):
        """
        NOTE: you can return a Deferred here
        """
        data = simplejson.loads(frame['body'])
        data['count'] += 1
        stomp.send(self.OUT_QUEUE, simplejson.dumps(data))

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    IncrementTransformer().run()
    reactor.run()

Twisted Consumer

import logging
import simplejson
from twisted.internet import reactor, defer
from stompest.async import StompConfig, StompCreator

class Consumer(object):

    QUEUE = '/queue/testOut'
    ERROR_QUEUE = '/queue/testConsumerError'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('localhost', 61613)
        self.config = config
    
    @defer.inlineCallbacks
    def run(self):
        #Establish connection
        stomp = yield StompCreator(self.config).getConnection()
        #Subscribe to inbound queue
        headers = {
            #client-individual mode is only supported in AMQ >= 5.2 but necessary for concurrent processing
            'ack': 'client-individual',
            #this is the maximum messages the broker will let you work on at the same time
            'activemq.prefetchSize': 100, 
        }
        stomp.subscribe(self.QUEUE, self.consume, headers, errorDestination=self.ERROR_QUEUE)

    def consume(self, stomp, frame):
        """
        NOTE: you can return a Deferred here
        """
        data = simplejson.loads(frame['body'])
        print "Received msg with count %s" % data['count']

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    Consumer().run()
    reactor.run()

Features

Simple

Basically the same as Net::Stomp

Twisted

  • Supports binary message bodies
  • Graceful shutdown - On disconnect or error, the client stops processing new messages and waits for all outstanding message handlers to finish before issuing the DISCONNECT command
  • Error handling - STOMP 1.0 does not have a NACK command so you have two options for error handling:
    • Client error handling - passing the errorDestination parameter to the subscribe() method will cause unhandled messages to be forwarded to that destination.
    • Disconnecting - if you do not configure an errorDestination and an exception propagates up from a message handler, then the client will gracefully disconnect. This is effectively a NACK for the message. You can configure ActiveMQ with a redelivery policy to avoid the "poison pill" scenario where the broker keeps redelivering a bad message infinitely.
  • Fully unit-tested including a simulated STOMP broker
  • Configurable connection timeout

Caveats

  • Tested with ActiveMQ versions 4.3 and above. Mileage may vary with other STOMP implementations

Twisted

  • Written before the advent of defer.inlineCallbacks so it could be simpler

Options

Twisted

  • StompCreator
    • alwaysDisconnectOnUnhandledMsg (defaults to False)
      • For backward-compatibility, you can set this option to True and the client will always disconnect in the case of an unhandled error in a message handler, even if an error destination has been configured.

Changes

  • 1.0.4 - Bug fix thanks to Njal Karevoll. No longer relies on newline after the null-byte frame separator. Library is now compatible with RabbitMQ stomp adapter.
  • 1.1.1 - Thanks to nikipore for adding support for binary messages.
  • 1.1.2 - Fixed issue with stomper adding a space in ACK message-id header. AMQ 5.6.0 no longer tolerates this.

stompest's People

Contributors

nkvoll avatar theduderog avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

stompest's Issues

Cannot connect to rabbitmq_stomp

I installed the stomp plugin for rabbitmq (rabbitmq_stomp.ez and amqp_client.ez) as per http://www.rabbitmq.com/stomp.html.

>>> from stompest.simple import Stomp
>>> QUEUE = '/queue/simpleTest'
>>> stomp = Stomp('localhost', 61613)
>>> stomp.connect()

After this, the python shell hangs indefinitely.

Testing against rabbitmq with telnet works as expected:

$ telnet localhost 61613
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
CONNECT

^@
CONNECTED
session:session-9Z1PNAe1n3GvJFeAApbMPg==
heart-beat:0,0
version:1.0

DISCONNECT

^@
Connection closed by foreign host.

While I was expecting some compatibility issues ("your milage may wary"), I was hoping basic functionality would be there :)

How to config exclusive=true ?

http://activemq.apache.org/exclusive-consumer.html

java code is

   queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
   consumer = session.createConsumer(queue);

I have try to config uri

failover:(tcp://xxxx:61613,tcp://xxx)?exclusive=true,randomize=false,maxReconnectDelay=30000

there is a errror:

[invalid options: 'exclusize']
Exception AttributeError: "'ActiveMQConsumer' object has no attribute 'client'" in <bound method

Need Unsubscribe Command

I'd like to be able to unsubscribe from a destination.

If I fork (is that the git term?) your project and add this can I push the change back?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.