Giter Club home page Giter Club logo

stompest's People

Contributors

cjrh avatar hydratk avatar jimlar avatar ktdreyer avatar nikipore avatar nkvoll avatar rtkpmcalpine avatar rtkrruvinskiy avatar scwicker avatar theduderog avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stompest's Issues

When connect's host param is left as None the server hostname should be used instead of ''

Right now for STOMP's host header to work at all the host= must be manually set when calling connect. If you don't then stompest will always leave it empty.

Instead of leaving it empty when unset the hostname of the fallback server being connected to should be used.

ie: When you use tcp://localhost:61613 in config and leave host as None then host: localhost should be used.
ie: When you use fallback:(tcp://a.stomp.local:61613,tcp://b.stomp.local:61613), leave host as None, and stompest connected to the second server then host: b.stomp.local should be used.

Add Python 3 support

Python 3 support would really be awesome, but there are some caveats:

  1. I am in a pure Python 2.7 world at work, so (a) Python 2.7 backward compatibility is a must and (b) I would not feel able to maintain a production-level stability. The latter means: contributions are welcome!
  2. It would probably make sense to switch from Twisted to asyncio/Tulip.

Exception <class 'select.error'>: (4, 'Interrupted system call') after upgrading stompest

After upgrading stompest from 2.1.6 to 2.3.0 I get 'Interrupted system call' whenever the application is waiting in canRead and I send a signal to the application.

I think this is due to the Python 3 updates here: 75c5692#diff-715c90981f31edb3e6e79120fa0c7e91L35

These types of exceptions were previously handled by stompest but I don't think that code is triggered anymore since if code == errno.EINTR is no longer true since code is no longer a number.

I'm running on Python 2.7.13.

Support version negotiation completely

Right now it doesn't look like stompest properly supports version negotiation.

StompConfig only supports a single version. And trying to use the versions parameter to connect on the sync client results in: StompProtocolError: Invalid versions: ['1.1'] [version=1.0]

Also ideally when version(s) is left as None we should not default to 1.0. Instead in that case we should include every version natively supported by stompest into accepts-version. And then use the version specified by the server in the CONNECTED header. So we can properly use the most recent version supported by both the server and stompest.

When use version=1.2, the consumer stop to connect on topic

Hi,

When I enable version=1.2 to use heart-beat, the stompest client successfully to connect on ActiveMQ, but it seems to never send frame to connect on Topic. On ActiveMQ UI, I not look the consumer.

The same code work fine when I remove heartBeats option and put version=1.0.
Any idea ?

Here my relevant code:

# coding: utf8
from __future__ import unicode_literals

import json
import logging
from twisted.internet import reactor, defer
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from fr.sihm.grabInventoryToSupervision.service.InventoryServerListener import InventoryServerListener
import logging
import sys, traceback, time

logger = logging


class ConsumerServer(object):
    ERROR_QUEUE = '/queue/testConsumerError'

    def connect(self, IP, port=61613, login=None, password=None):
        if IP is None or IP == "":
            raise ("IP can't be null and can't be empty")

        if login is not None and password is not None:
            self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), login=login,
                                       passcode=password, version="1.2")

        else:
            self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), version="1.2")

    @defer.inlineCallbacks
    def run(self, destinations):

        if isinstance(destinations, dict) is False:
            raise ("destination can't be null and can't be empty")

        if "serverInventory" not in destinations:
            raise ("You must set the destination to consume db inventory AMQP")

        headers = {
            # client-individual mode is necessary for concurrent processing
            # (requires ActiveMQ >= 5.2)
            StompSpec.ACK_HEADER:
            StompSpec.ACK_CLIENT_INDIVIDUAL,
            # the maximal number of messages the broker will let you work on at the same time
            'activemq.prefetchSize':
            '2000'
        }

        try:
            client = yield Stomp(self._config).connect(headers=headers, heartBeats=(10000, 10000))
            client.disconnected.addCallbacks(
                lambda _: client.disconnected, lambda _: self.reconnect(
                    destinations))
            client.subscribe(
                destinations["serverInventory"],
                headers,
                listener=SubscriptionListener(
                    InventoryServerListener.consume,
                    onMessageFailed=self.manageError))

        except Exception, e:
            logger.warn(
                "Can't connect to ActiveMQ. Retry connexion in 60 seconds. Error: %s",
                str(e))
            logger.warn(str(traceback.format_exc()))
            time.sleep(60)
            self.run(destinations)

    def reconnect(self, destinations):
        logger.warn(
            "The connexion with ActivMQ is lost. Retry connexion in 60 seconds"
        )
        time.sleep(60)
        self.run(destinations)

    def manageError(self, connection, failure, frame, errorDestination):
        logging.error("Error : " + str(failure))

StompSpec.SELECTOR_HEADER does not filter

Even though, I subscribe to a topic using a header such as ```python
{ StompSpec.SELECTOR_HEADER: "" }

all messages are received which means a filtering does not occur. I tested with RabbitMQ 3.0.4

Reconnection after STOMP server restarting

Then STOMP server stopped consumer raise exception, but don't reconnecting:

013-02-03 23:12:34+0400 [StompProtocol,client] No handlers could be found for logger "stompest.async.client"
2013-02-03 23:12:34+0400 [StompProtocol,client] Unhandled error in Deferred:
2013-02-03 23:12:34+0400 [StompProtocol,client] Unhandled Error
    Traceback (most recent call last):
    Failure: stompest.error.StompConnectionError: Unexpected connection loss [Connection was closed cleanly.]

2013-02-03 23:12:34+0400 [StompProtocol,client] Stopping factory <stompest.async.protocol.StompFactory instance at 0x1814908>

failover doesn't seem to be working

I'm trying to use async stompest (2.3.0) with RabbitMq (3.7.18) and almost everything works but the failover feature. I've configured client with failover uri, then restarted the broker and the client has not reconnected.
There were error messages about the connection close printed by the DisconnectListener, but I haven't seen any attempts from StompFailoverTransport to re-connect.
I put a breakpoint in StompFailoverTransport iter, it gets triggered once (on client connect) and not more.

Asynchronous receiving implementation can cause overload at high message rates

Currently, async.Stomp._onFrame and async.Stomp._onMessage call self._notify to dispatch the callbacks for the message. self._notify wraps the notification in a task and dispatches it asynchronously from the creation of the StompFrame in StompProtocol.dataReceived. If messages arrive fast enough, stompest can end up receiving them and creating StompFrames for them faster than the event loop dispatches the notifications. This can cause the Python process' memory use to very quick balloon as things back up more and more. The gc starts kicking in very aggressively, slowing everything down even further.

I've experimented with synchronous notifications in _onFrame and _onMessage, and that makes the pipeline keep up much better, with flat memory use and much less message backup.

Producer example don't work

Producer in example make disconnect before recieve RECIEPT from server:

avsytar@Watcher:~/проекты/Print_System$ python producer.py 
INFO:stompest.async.protocol:Connecting to 192.168.1.214:61613 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [headers={u'passcode': 'guest', u'login': 'guest', u'host': '', u'accept-version': '1.0,1.1,1.2'}, version=1.0]
DEBUG:stompest.async.protocol:Received CONNECTED frame [headers={u'session': u'ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2783', u'heart-beat': u'0,5000', u'version': u'1.2', u'server': u'ActiveMQ/5.8.0'}, version=1.0]
INFO:stompest.async.client:Connected to stomp broker [session=ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2783, version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-0'}, body='{"count": 0}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-1'}, body='{"count": 1}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-2'}, body='{"count": 2}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-3'}, body='{"count": 3}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-4'}, body='{"count": 4}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-5'}, body='{"count": 5}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-6'}, body='{"count": 6}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-7'}, body='{"count": 7}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-8'}, body='{"count": 8}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-9'}, body='{"count": 9}', version=1.2]
INFO:stompest.async.listener:Disconnecting ...
DEBUG:stompest.async.protocol:Sending DISCONNECT frame [version=1.2]
avsytar@Watcher:~/проекты/Print_System$ 

but if do not make disconnect:

avsytar@Watcher:~/проекты/Print_System$ python producer.py 
INFO:stompest.async.protocol:Connecting to 192.168.1.214:61613 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [headers={u'passcode': 'guest', u'login': 'guest', u'host': '', u'accept-version': '1.0,1.1,1.2'}, version=1.0]
DEBUG:stompest.async.protocol:Received CONNECTED frame [headers={u'session': u'ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2787', u'heart-beat': u'0,5000', u'version': u'1.2', u'server': u'ActiveMQ/5.8.0'}, version=1.0]
INFO:stompest.async.client:Connected to stomp broker [session=ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2787, version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-0'}, body='{"count": 0}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-1'}, body='{"count": 1}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-2'}, body='{"count": 2}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-3'}, body='{"count": 3}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-4'}, body='{"count": 4}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-5'}, body='{"count": 5}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-6'}, body='{"count": 6}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-7'}, body='{"count": 7}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-8'}, body='{"count": 8}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-9'}, body='{"count": 9}', version=1.2]
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-0'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-1'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-2'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-3'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-4'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-5'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-6'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-7'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-8'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-9'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Sending heart-beat
^C

Improve integration tests

Some integration tests do not adapt to all brokers (ActiveMQ, RabbitMQ, Apollo), and some which used to work in former versions of those brokers are broken with the current versions.

Why some parameters of failover are comments?

I tried to use failover:(xxx)?timeout=xxx, but the stompest does not support this. Then I read the source code and found that some transport options are commented out in src/core/stompest/protocol/failover.py

    _SUPPORTED_OPTIONS = { 
        'initialReconnectDelay': _configurationOption(int, 10) 
        , 'maxReconnectDelay': _configurationOption(int, 30000)
        , 'useExponentialBackOff': _configurationOption(_bool, True)
        , 'backOffMultiplier': _configurationOption(float, 2.0)
        , 'maxReconnectAttempts': _configurationOption(int, -1) 
        , 'startupMaxReconnectAttempts': _configurationOption(int, 0)
        , 'reconnectDelayJitter': _configurationOption(int, 0)
        , 'randomize': _configurationOption(_bool, True)
        , 'priorityBackup': _configurationOption(_bool, False)
        #, 'backup': _configurationOption(_bool, False), # initialize and hold a second transport connection - to enable fast failover
        #, 'timeout': _configurationOption(int, -1), # enables timeout on send operations (in miliseconds) without interruption of reconnection process
        #, 'trackMessages': _configurationOption(_bool, False), # keep a cache of in-flight messages that will flushed to a broker on reconnect
        #, 'maxCacheSize': _configurationOption(int, 131072), # size in bytes for the cache, if trackMessages is enabled
        #, 'updateURIsSupported': _configurationOption(_bool, True), # determines whether the client should accept updates to its list of known URIs from the connected broker
    } 

The param timeout just be commented out.
I did not find the reasons about this.
Could someone help me explain it ?

Concurrent connection attempts are impossible

Someone asked you about it here, and you said:

you would never want to send two CONNECT frames over the same wire-level connection, which is why there is an @exclusive wrapper around the connect() method. If you want to run more than one STOMP session, then you'll have to instantiate a dedicated async.Stomp connection for each STOMP session

But the @exclusive wrapper doesn't pay any attention to what connection it's being called on. It prevents all concurrent calls to the connect method, in all objects.

Add SSL support

The URI scheme supports only TCP, no SSL (the author doesn't need it because the client is run in "safe" production environments). For the async client, it should be straightforward to support SSL by means of the Endpoint API. Contributions are welcome!

STOMP over websocket

Just in case anyone wants to emulate a browser, this uses websocket-client to implement the transport for stomper. It's a bit hacky, but it works. You provide a message receive handler, and messages may be sent normally with the client.

usage:

from wsstomp import WSStompClient

def receive_message(command, data, headers):
    """Handle a STOMP message.  data is parsed JSON"""
    print(f"Message received")

ws_client = WSStompClient("<ws-url>", on_receive=receive_message, host="<ws-host>", origin="<origin>", headers={
    "Sec-WebSocket-Protocol": "v12.stomp"
}, debug=verbose)
ws_client.connect(versions=[StompSpec.VERSION_1_2], heartBeats=(0, 0), host="<hostname>")
token = ws_client.subscribe("/user/messages")

wsstomp.py:

import json
import logging
import time
from urllib import parse
from threading import Thread

import websocket
from stompest.config import StompConfig
from stompest.protocol import StompParser, StompSpec
from stompest.sync import client
from websocket import ABNF


class WSTransport(object):
    """Stomp client handles connect only"""
    def __init__(self, url, host=None, origin=None, headers=None, debug=False,
                 on_frame_received=None, **kwargs):
        if debug:
            websocket.enableTrace(True)
        else:
            logging.getLogger("websocket").setLevel(logging.ERROR)
        self.ws = websocket.WebSocketApp(url, on_open=self._on_open,
                                         on_message=self._on_message,
                                         on_error=self._on_error,
                                         on_close=self._on_close,
                                         on_data=self._on_data,
                                         header=[f"{k}: {v}" for k, v in (headers or {}).items()],
                                         **kwargs)
        self.url = url
        self.debug = debug
        self.host = host or parse.urlparse(url).hostname
        self.origin = origin
        self.opened = False
        self.connected = False
        self._parser = StompParser()
        self.on_frame_received = on_frame_received

    def _connect(self, timeout=None):
        thread = Thread(target=lambda: self.ws.run_forever(host=self.host, origin=self.origin), daemon=True)
        thread.start()

        start = time.time()
        while not self.opened:
            time.sleep(0.25)
            if timeout and (time.time() - start) * 1000 >= timeout:
                raise TimeoutError(f"Connection to {self.url} timed out")

    def _on_open(self, ws):
        self.opened = True

    def _on_close(self, ws, status_code, message):
        self.connected = False

    def _on_error(self, ws, error):
        print(f"Error: {error}")

    def _on_message(self, ws, data):
        self._parser.add(data)
        if not self.connected:
            self.connected = True
        elif self.on_frame_received:
            frame = self._parser.get()
            self.on_frame_received(frame)

    def _on_data(self, ws, data, data_type, cont_flag):
        pass

    def canRead(self, timeout=None):
        return True

    def connect(self, timeout=None):
        self._connect(timeout)

    def disconnect(self):
        self.ws.on_close = None
        self.ws.close()
        self.connected = False

    def receive(self):
        # Used on connect only
        return self._parser.get()

    def send(self, stomp_frame):
        # Incoming: StompFrame => outgoing ABNF
        ws_frame = ABNF.create_frame(bytes(stomp_frame), ABNF.OPCODE_BINARY)
        self.ws.sock.send_frame(ws_frame)

    def setVersion(self, version):
        self._parser.version = version


class WSStompClient(client.Stomp):

    def __init__(self, url, url_params=None, config=None, debug=False, on_receive=None, **kwargs):
        pstr = "&".join([f"{parse.quote(key)}={parse.quote(str(value))}" for key, value in (url_params or {}).items()])
        url = f"{url.replace('https', 'wss')}?{pstr}"
        if debug:
            print(f"url {url}")
        parsed = parse.urlparse(url)
        # Provide a dummy config; websocket-client does the actual connection
        super().__init__(config or StompConfig(f"tcp://{parsed.hostname}:443", version=StompSpec.VERSION_1_2))
        self.debug = debug
        def handle_frame(frame):
            if frame.command == StompSpec.MESSAGE:
                self.ack(frame)
            if on_receive:
                on_receive(self, frame.command, json.loads(frame.body.decode() or "{}"), frame.headers)
        factory = lambda host, port, sslContext: WSTransport(url, debug=debug,
                                                             on_frame_received=handle_frame, **kwargs)
        self._transportFactory = factory

Listener API for async client

Factor a Listener API out of the async client which has callbacks for all events like onMessage(), onError(), onConnected(), ... One could add and remove per queue any number of Listener instances (observer pattern). The client would then more clearly represent the connection with its associated StompSession. This is a generalization of the current handlers which just deal wih onMessage() and cannot be removed.

gevent support

It would be nice to have explicit gevent support. Right now stompest will work fine in gevent if you monkey patch python's socket and select. But it would be nice to have stompest work without using monkey patching.

Here's a demo script showing stompest with gevent:
https://gist.github.com/dantman/5290438

Running that script right now will work due to the monkey patch lines. But see what happens when you comment out the two monkey.patch_*() lines.

Ideally stompest would either have explicit gevent support in the sync client or there would be a stompest.gevent that could be imported instead of stompest.sync.

ipv6 support

Does not work with v6 addresses

File "/usr/lib/python2.6/site-packages/stompest-2.1.6-py2.6.egg/stompest/protocol/failover.py", line 189, in _parse
raise ValueError('invalid uri: %s [%s]' % (self.uri, msg))
exceptions.ValueError: invalid uri: tcp://xxx:xxx:c081:20::70:65001 [invalid broker(s): 'NoneType' object has no attribute 'groupdict']

collections.MutableMapping is not available in Python 3.10

collections.MutableMapping is deprecated in Python 3.3:

DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working

In Python 3.10, it's not usable at all, so stompest does not work with py310 (reported at https://bugzilla.redhat.com/1926350)

It would be great to merge #54 and then edit the parent class of InFlightOperations to be collections.abc.MutableMapping.

Thread-unsafe issue if we set ack=True in SubscriptionListener

In my case, I set ack=True (which is also the default value) when create a stompest.async.listener.SubscriptionListener instance. And the onMessage function will call connection.ack(frame) after message handler finishes(stompest.async.listener.py line 220). But thread-unsafe issue comes up if I set the twisted reactor to contains more threads, like reactor.suggestThreadPoolSize(size=4) .

The reason is yield connection.ack(frame) can cause the ack action to be executed in another thread different from main thread. If you trace the ack action, this will ultimately twisted.internet.abstract.FileDescriptor.write(self, data). This function is thread-unsafe because some data is corrupted(twisted.internet.abstract.py line 357).

The result is, some of the acks are missing in fact although there is log of "Sending ACK frame...".

Now, I set ack=False and do ack in main thread to solve this problem

How to config exclusive=true ?

http://activemq.apache.org/exclusive-consumer.html

java code is

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
I have try to config uri

failover:(tcp://xxxx:61613,tcp://xxx)?exclusive=true,randomize=false,maxReconnectDelay=30000

there is a errror:

[invalid options: 'exclusize']
Exception AttributeError: "'ActiveMQConsumer' object has no attribute 'client'" in <bound method

StompFrame and StompHeartBeat __str__ method returns bytes in Python 3

The __str__ method must return a string in Python 3 but for StompFrame and StompHeartBeat it returns bytes. For example:

from stompest.protocol import StompFrame, StompSpec
frame = StompFrame(StompSpec.SEND, rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')])
bytes(frame)
b'SEND\nfoo:bar1\nfoo:bar2\n\n\x00'
str(frame)
Traceback (most recent call last):
File "", line 1, in
TypeError: str returned non-string (type bytes)

Maybe __str__ should just call info()?

Incorrectly expects session header to be present in CONNECTED

stompest throws this error. However, according to the spec the session header is optional, both in versions 1.1 and 1.2.

~$ ./consumer.py 
Traceback (most recent call last):
  File "./consumer.py", line 11, in <module>
    client.connect()
  File "/usr/local/lib/python2.7/dist-packages/stompest/sync/client.py", line 100, in connect
    self._connect(headers, versions, host, heartBeats, connectedTimeout)
  File "/usr/local/lib/python2.7/dist-packages/stompest/sync/client.py", line 113, in _connect
    self.session.connected(frame)
  File "/usr/local/lib/python2.7/dist-packages/stompest/protocol/session.py", line 213, in connected
    (self.version, self._server, self._id, (self._serverHeartBeat, self._clientHeartBeat)) = commands.connected(frame, versions=self._versions)
  File "/usr/local/lib/python2.7/dist-packages/stompest/protocol/commands.py", line 241, in connected
    raise StompProtocolError('Invalid %s frame (%s header is missing) [headers=%s]' % (StompSpec.CONNECTED, StompSpec.SESSION_HEADER, headers))
stompest.error.StompProtocolError: Invalid CONNECTED frame (session header is missing) [headers={'version': '1.2'}]

How to dynamically increase the number of consumers

Hi,

I want to dynamically increase the number of consumers as the number of messages increases in the Queues, what approach should be considering in achieving the same, any help would be appreciated.

I am using activemq, with stompest.async

Thanks.

Sample Code from stompest used :

class Consumer(object):
    QUEUE = '/queue/testIn'
    ERROR_QUEUE = '/queue/testConsumerError'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('tcp://localhost:61613')
        self.config = config

    @defer.inlineCallbacks
    def run(self):
        client = Stomp(self.config)
        yield client.connect()
        headers = {
            # client-individual mode is necessary for concurrent processing
            # (requires ActiveMQ >= 5.2)
            StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
            # the maximal number of messages the broker will let you work on at the same time
            'activemq.prefetchSize': '100',
        }
        client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))


    def consume(self, client, frame):
        """
        NOTE: you can return a Deferred here
        """
        data = json.loads(frame.body.decode())
        print('Received frame with count %d' % data['count'])

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    Consumer().run()
    reactor.run()

Support for asyncio

I have seen where there is support for the Twisted framework. Is there plans to move to asyncio in Python 3.x?

Support for repeated header entries (STOMP 1.2)

According to the STOMP 1.2 protocol, a client SHOULD keep the first header entry and MAY ignore repeated header entries.

Currently, the stompest API uses a dictionary to store headers, so the internal structure of StompFrame.headers would have to be changed, ideally without breaking too much of existing client code (although breaking changes are allowed, that's why stompest 2 is still alpha). @theduderog suggested to implement a structure similar to werkzeug's MultiDict.

I am fine without since it is a MAY feature which I don't like too much because in my opinion it mainly complicates things, except for the very appealing usage as routing envelopes à la ZeroMQ, but if anybody insists on a full implementation with repeated headers, I'm willing to assist.

authentication example?

Hi,

Sorry for asking here, but google group has no members...

I'm trying to convert the following java example to python using stompest:

StompConnection connection = new StompConnection();
connection.open("datafeeds.networkrail.co.uk", 61618);
connection.connect("system", "manager");
StompFrame connect = connection.receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected");
}
connection.subscribe("/topic/TRAIN_MVT_ALL_TOC", Subscribe.AckModeValues.CLIENT);
connection.begin("tx2");
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
connection.commit("tx2");
connection.disconnect();

How do I pass a username and password when connecting using stompest?

cheers,

Chris

support SSL client auth in stompest.async

I'm attempting to authenticate to STOMP on ActiveMQ that requires SSL clients to present a x509 keypair in order to connect.

For the stompest sync client, it is really simple, I just have to provide the public cert and key to my ssl context with load_cert_chain():

context = ssl.create_default_context()
context.load_cert_chain(certfile="kdreyer.pem", keyfile='kdreyer.key')
...
CONFIG = StompConfig(BROKER, sslContext=context)

... and then I can receive messages in my queue, etc.

Unfortunately this does not work for the stompest async client. Here's the error I'm getting

INFO:stompest.async.protocol:Connecting to server.example.com:61612 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [version=1.0]
Unhandled error in Deferred:

INFO:stompest.async.listener:Disconnected: [('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]
ERROR:stompest.async.listener:Disconnect because of failure: Unexpected connection loss [[('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]]
DEBUG:stompest.async.listener:Calling disconnected errback: Unexpected connection loss [[('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]]

I've been looking over Twisted's docs for Client cert auth, but I'm a bit lost as to where I would set those options in stompest.async. Somewhere in util.py ?

Asynch examples on ActiveMQ - Artemis

Its possible that ActiveMQ Artemis will eventually take over and become Active MQ 6. Sync messaging works well but when I try to do the async examples it doesn't seem to work properly on Artemis.

I also tried changing to the naming used on Artemis for JMS mapping, ie: /queue/testIn to jms.queue.testIn

I've tried it on Artemis 1.5.3 and 2.1.0. Here is a snippet of the transfer.py executing.

ActiveMQ Example
DEBUG:stompest.async.listener:Handler for message ID:host-63961-1495574070738-3:1:-1:1:9 complete. DEBUG:stompest.async.protocol:Received MESSAGE frame [headers={u'priority': u'4', u'timestamp': u'1495574098692', u'destination': u'/queue/testIn', u'message-id': u'ID:host-63961-1495574070738-3:1:-1:1:10', u'expires': u'0'}, body='{"count": 9}', version=1.0]

Artemis Example:
DEBUG:stompest.async.protocol:Received MESSAGE frame [headers={u'expires': u'0', u'timestamp': u'1495575970275', u'receipt': u'message-9', u'persistent': u'false', u'priority': u'4', u'destination': u'/queue/testIn', u'redelivered': u'false', u'message-id': u'2147483999', u'subscription': u'subscription//queue/testIn'}, body='{"count": 9}', version=1.0] ERROR:stompest.async.client:Ignoring message (no handler found): 2147483999 [MESSAGE frame [headers={u'expires': u'0', u'timestamp': u'1495575970275', u'receipt': u'message-9', u'persistent': u'false', u'priority': u'4', u'destination': u'/queue/testIn', u'redelivered': u'false', u'message-id': u'2147483999', u'subscription': u'subscription//queue/testIn'}, body='{"count": 9}', version=1.0]]

receiveFrame does not accept a timeout parameter

Currently the synchronous version of stompest do not accept a timeout paramter to the method receiveFrame(). While one could use the method canRead() in a queue with a single consumer, this will still cause the cause execution to be indefinitely blocked, should another consumer fetch a frame in between the calls to canRead and receiveFrame. Notably, both the methods connect() and canRead() accepts a timeout parameter, while neither are extremely blocking in nature.

Invalid SUBSCRIBE format

Using Stomp version 1.1 the subscribe function does not work. In Stomp 1.1 you require a id field in the header which the function does not do and thus recieves an error from the server saying so. My current workaround for this issue is as follows bellow. The code is shown set up for connecting to a simsig session when the interface gateway is enabled and output all messages that arrive in a format simular to National Rail data feeds. Copy of the protocol specification is here

from stompest.config import StompConfig
from stompest.protocol import StompSpec, frame
from stompest.sync import Stomp

CONFIG = StompConfig('tcp://localhost:51515', version=StompSpec.VERSION_1_1)
QUEUE = '/topic/TD_ALL_SIG_AREA'

if __name__ == '__main__':
    client = Stomp(CONFIG)
    client.connect()
#    client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})
    a = frame.StompFrame(command='SUBSCRIBE', rawHeaders=[('id', '0'), ('destination', '/topic/TD_ALL_SIG_AREA'),('ack', 'auto')])
    client.sendFrame(a)
    while True:
        frame = client.receiveFrame()
        print(dict(frame))
    client.disconnect()

async: subscription listeners should only react to "their" onSubscribe() callback

The SubscriptionListener class of the listeners API is broken as follows: The onSubscribe() method reacts to all new subscriptions. It should only react to its first invokation. The fix looks as follows:

class SubscriptionListener(listener.SubscriptionListener):
    def onSubscribe(self, connection, frame, context):
        if self._headers is not None:
            return
        return super(SubscriptionListener, self).onSubscribe(connection, frame, context)

stompest consumer is slow for activemq

    if self.client.canRead(0):
        self.frame = self.client.receiveFrame()
        self.client.ack(self.frame)

I use this code for activemq, then I found the consumer used stompest is slow to consumer the msg, the qps is 1/s ,every seconds only consume one msg.

why ? It is the python problem?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.