Giter Club home page Giter Club logo

txzmq's Introduction

Twisted bindings for 0MQ

image

image

Introduction

txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor).

txZMQ supports both CPython and PyPy and ØMQ library version 2.2.x or 3.2.x.

Documentation is available at ReadTheDocs.

Requirements

C library required:

  • ØMQ library 2.2.x or 3.2.x

Python packages required:

  • pyzmq >= 13 (for CPython & PyPy)
  • Twisted

Details

txZMQ introduces support for general 0MQ sockets by class ZmqConnection that can do basic event loop integration, sending-receiving messages in non-blocking manner, scatter-gather for multipart messages.

txZMQ uses ØMQ APIs to get file descriptor that is used to signal pending actions from ØMQ library IO thread running in separate thread. This is used in a custom file descriptor reader, which is then added to the Twisted reactor.

Upgrading from 0.3.x

If you're upgrading from version 0.3.1 and earlier, please apply following changes to your code:

  • root package name was changed from txZMQ to txzmq, adjust your imports accordingly;
  • ZmqEndpointType.Connect has been renamed to ZmqEndpointType.connect;
  • ZmqEndpointType.Bind has been renamed to ZmqEndpointType.bind;
  • ZmqConnection.__init__ has been changed to accept keyword arguments instead of list of endpoints; if you were using one endpoint, no changes are required; if using multiple endpoints, please look for add_endpoints method.

Hacking

Source code for txZMQ is available at github; forks and pull requests are welcome.

To start hacking, fork at github and clone to your working directory. To use the Makefile (for running unit tests, checking for PEP8 compliance and running pyflakes), you will want to have virtualenv installed (it includes a pip installation).

Create a branch, add some unit tests, write your code, check it and test it! Some useful make targets are:

  • make env
  • make check
  • make test

If you don't have an environment set up, a new one will be created for you in ./env. Additionally, txZMQ will be installed as well as required development libs.

txzmq's People

Contributors

aleksclark avatar aleksi avatar arnimarj avatar davidjfelix avatar eagafonov avatar fladi avatar j0s avatar jamesharr avatar lvh avatar mordae avatar nnseva avatar ralphbean avatar smira avatar stenote avatar svpcom avatar tisdall avatar wbarnha avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

txzmq's Issues

PubSub data corrupt?

I'm trying to port a gevent_pyzmq app over to Twisted, in which I'm doing something like this:

while True:
        message = receiver.recv()
        market_json = zlib.decompress(message)

The equivalent in my txZMQ module:

zf = ZmqFactory()
e = ZmqEndpoint('connect', 'tcp://master.eve-emdr.com:8050')
s = ZmqSubConnection(zf, e)
s.subscribe("")

def doPrint(message, tag=None):
    zlib.decompress(message)
s.gotMessage = doPrint

I'm getting this:

        Traceback (most recent call last):
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/context.py", line 118, in callWithContext
            return self.currentContext().callWithContext(ctx, func, *args, **kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/context.py", line 81, in callWithContext
            return func(*args,**kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/internet/posixbase.py", line 586, in _doReadOrWrite
            why = selectable.doRead()
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/txZMQ/connection.py", line 165, in doRead
            log.callWithLogger(self, self.messageReceived, message)
        --- <exception caught here> ---
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/log.py", line 84, in callWithLogger
            return callWithContext({"system": lp}, func, *args, **kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/log.py", line 69, in callWithContext
            return context.call({ILogContext: newCtx}, func, *args, **kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/context.py", line 118, in callWithContext
            return self.currentContext().callWithContext(ctx, func, *args, **kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/Twisted-12.0.0-py2.7-linux-x86_64.egg/twisted/python/context.py", line 81, in callWithContext
            return func(*args,**kw)
          File "/home/gtaylor/.virtualenvs/thundercuddles/lib/python2.7/site-packages/txZMQ/pubsub.py", line 63, in messageReceived
            self.gotMessage(*reversed(message[0].split('\0', 1)))
          File "server.tac", line 94, in doPrint
            zlib.decompress(buf, -1)
        zlib.error: Error -2 while preparing to decompress data: inconsistent stream state

I started comparing the size of the 'message' variables in both versions of the module, and it appeared that the txZMQ one is about half the size of the gevent_pyzmq one. Furthermore, it looks like a bunch of data is being stuffed in the 'tag' variable randomly. Oddly enough, len(message+tag) always ends up being one character short of what is in my gevent version.

Any ideas?

txzmq.ZmqREQConnection messageReceived code is returning weird results

When I call the following code:

def prime_data(msg):
	print( msg )

poll = ZmqREQConnection(self.factory, ZmqEndpoint("connect", "127.0.0.1:5001))
poll.messageReceived = prime_data
poll.sendMsg(b"*")

I get the following output:

[b'\x7fs\x8d\xf1\xfa\x19G\x08\xbd\xff\x91\xcf\xcb\x14KB', b'', b'ACTUAL CONTENT OF MESSAGE...']

Comparing ZmqREPConnection's and ZmqREQConnection's implementations of messageReceived methods here and here, I'm wondering if there isn't a missing step for ZmqREQConnection's processing of incoming messages.

Perhaps I'm missing something obvious? This isn't exactly show stopper bug, but it does seems like an inconsistent API.

New release

Hello.

Could you please make a new release (here and on PyPI)? Current master contains some improvements of Python3 compatibility which will be handy when I will update Fedora package with Python3 subpackage.

Thank you and have a nice day.
Lumir

push with bind

i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise
{{{
jjx@ubuntu:~/sources/txZMQ/examples$ python push_pull.py --method=bind --mode=push
producing ['1352354886.76', 'ubuntu']
Unhandled Error
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
DeferredList(beforeResults).addCallback(self._continueFiring)
File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
callbackKeywords=kw)
File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
self._runCallbacks()
File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, _args, *_kw)
--- ---
File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
callable(_args, *_kwargs)
File "push_pull.py", line 45, in produce
s.push(data)
File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
self.send(message)
File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)

File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)

File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)

zmq.core.error.ZMQError: Resource temporarily unavailable

}}}

Does this package support Python 3?

On 19 February 2017, Python 3 will be 3,000 days old! We are interested to see if we can get at least 50% of the Top 5,000 PyPI packages to compatible with Python 3 by that date. We are really close and given that this package is in the PyPI Top 5,000, we seek your assistance in pushing us over that threshold.

So, if this package already supports Python 3 then please consider adding an appropriate trove classifier "Programming Language :: Python :: 3" to this package's PyPI page so that tools can automaticly determine its Python 3 compatibility.

push with bind

i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise

python push_pull.py --method=bind --mode=push

producing ['1352354886.76', 'ubuntu']
Unhandled Error
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
    DeferredList(beforeResults).addCallback(self._continueFiring)
  File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
    callbackKeywords=kw)
  File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
    self._runCallbacks()
  File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
--- <exception caught here> ---
  File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
    callable(*args, **kwargs)
  File "push_pull.py", line 45, in produce
    s.push(data)
  File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
    self.send(message)
  File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
    self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
  File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)

  File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)

  File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)

zmq.core.error.ZMQError: Resource temporarily unavailable

Timeout handling in REP-REQ

I'm not quite sure this is a bug or if its an intended feature, and I'd like to elaborate on this problem that I've spent some time figuring out:

I'm using REP-REQ in a context where the REP server is unstable. The txzmq implements a timeout on the ZmqREQConnection.sendMsg() method. This timeout works perfectly in itself, but after some digging it seems the timeout feature is not a part of zmg. The problem with this is that the message is still left in zmq REQ queue, even after the caller has already gotten its Errback() called for the this message. If the server becomes available at some later time, the reply will be send back to the REQ client, but it goes nowhere in the txzmq stack.

The second problem is that sending a message to the server is the only way of knowing if the server connection is alive, so one needs to keep sending messages (that will time out) regularly to check if the link is up. When it is up again, the server will be flooded with a stream of these "alive" messages.

My resolution to the issue is to use a variant of the lazy pirate pattern: When sending a message with sendMsg() I install an Errback callback that closes the link, which will flush the connection.

    if not socket:
        socket = ZmqREQConnection(zmq, req_endpoint)

    try:
        d = socket.sendMsg(message, timeout=2)

        def onTimeout(fail):
            socket.shutdown()
            socket = None
            return fail

        d.addErrback(onTimeout)
        return d

    except zmq.error.Again:
        print("Handle me...")

Since this is more an effect of the zmq's REP-REQ implementation than txzmq, I'm not sure this is a bug in txzmq, but I dislike that a message receives a timeout, while still being scheduled for transmission. Perhaps the above logic is required? Perhaps there is some way to cancel the message when it times out?

remove endpoints

Please add the interface for removing endpoints from an existing sockets. As libzmq 3.x supports disconnect, this is all we're looking forward to!

Timeout support

I have looked at the code and it does not seem like there is any support for setting timeouts.

Is that correct, it seems like a pretty fundamental thing for REP/REQ over tcp.

'ZmqREPConnection' object has no attribute '_routingInfo'

In our project we encountered the following failure:


  File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 153, in __init__
    ZmqConnection.__init__(self, *args, **kwargs)
  File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 174, in __init__
    self.doRead()
  File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 281, in doRead
    log.callWithLogger(self, self.messageReceived, message)
--- <exception caught here> ---
  File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
  File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 179, in messageReceived
    self._routingInfo[msgId] = routingInfo
exceptions.AttributeError: 'ZmqREPConnection' object has no attribute '_routingInfo'

It seems like the doRead called during initialization of ZmqREPConnection actually gets a message and tries to handle it, before the _routingInfo attribute was initialized later in its __init__.

txzmq/pubsub.py", line 26 -- exception cannot conatenate bytes

File "/home/ar/Envs/a3_pyvcluster/lib/python3.7/site-packages/txzmq/pubsub.py", line 26, in publish |class ZmqSubConnection(ZmqConnection):
self.send(tag + b'\0' + message) | """
TypeError: can only concatenate str (not "bytes") to str

def publish(self, message, tag=b''):
"""
Publish message with specified tag.

    :param message: message data
    :type message: str
    :param tag: message tag
    :type tag: str
    """
    self.send(tag + b'\0' + message) <---- It does not work in python3, I tried both tag and message being bytes and it does not work either

Calls to getsockopt & setsockopt fail

Changes to pyzmq have introduced a bug into txZMQ on operations using getsockopt and setsockopt. I'm using pyzmq 13.0.0 (current on pypi) and txZMQ 0.6.1 (also current on pypi). The bug was caused on January 27th by pyzmq commit zeromq/pyzmq@4ba9728#zmq/core/socket.pyx

Steps to reproduce:

>>> from txzmq import ZmqFactory, ZmqEndpoint, ZmqRouterConnection
>>> zf = ZmqFactory()
>>> ze = ZmqEndpoint('bind', 'tcp://127.0.0.1:6868')
>>> zr = ZmqRouterConnection(zf, ze)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/d324348/foo/lib/python2.6/site-packages/txzmq/connection.py", line 88, in __init__
    self.fd = self.socket.getsockopt(constants.FD)
AttributeError: 'zmq.core.socket.Socket' object has no attribute 'getsockopt'

Patch incoming...

PyPi has obsolete version

PyPi still has version 0.3.1, while the current gitbub source makes that version obsolete with many changes.
Please update the official package so that it can be easily deployed with easy_install, thanks

Rename XREQ/XREP ZmqConnection to REQ/REP

  1. XREQ/XREP names are deprecated now
  2. Even though XREQ/XREP is used to implemented the connection, it looks more like REQ/REP for async case. Message format is enforced, two connections can work together well.
  3. Add separate DEALER/ROUTER connection classes which expose raw interface to sockets (allowing any message patterns).

Incorrect (twice?) shutdown when registered for shutdown

The factory which has been registered for shutdown ...

        self.zmqfactory = ZmqFactory()
        self.zmqfactory.registerForShutdown()
        self.receiver = MyZmqSubConnection(self,self.zmqfactory)

is shutted down later

       self.zmqfactory.shutdown()

As a result, the unhandled exception happens when the reactor is shutted down:

Unhandled Error
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 416, in fireEvent
    DeferredList(beforeResults).addCallback(self._continueFiring)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 307, in addCallback
    callbackKeywords=kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 296, in addCallbacks
    self._runCallbacks()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 578, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
--- <exception caught here> ---
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 429, in _continueFiring
    callable(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/txzmq/factory.py", line 51, in shutdown
    for connection in self.connections.copy():

It probably useful either add an 'active' flag of the factory to avoid conflict when shutting down twice (manually and automatically), or unregister from the reactor to be shutted down automatically to avoid a conflict as such.

Development Plans

Hey folks,

Not sure where else to put this... I guess there's no mail list yet? ;-)

I need to be able to use txZMQ in the same way that I use other Twisted code I'm running right now (using Twisted endpoints). As such, I've created a blueprint to track this work, here:
https://blueprints.launchpad.net/txzmq/+spec/twisted-api-symmetry

Note that I set up a txZMQ git repo import to Launchpad last July, and this is where I'm putting the dev notes. (There's another blueprint there for expanding the examples). I've used Launchpad blueprints for Ubuntu when I was at Canonical, and I'm now using it for OpenStack. It's pretty good for this kind of work tracking.

As for the txZMQ work, right now I'm resisting the urge of creating a custom reactor. Instead am exploring the idea of "faked out" protocols/transports and how to best make that work. The blueprint link above has several links for more info (for the Twisted endpoints API), for the curious.

In the blueprint whiteboard, I've listed the work items. Each items approximately maps to anywhere from one to a handful of branches here on git. (I like to keep branches small, so that they are easy to review and identify possible issues/impact).

If folks are interested in collaborating or proposing different approaches, I guess we can chat about that on this ticket :-) (If there's a better place for that, do let me know.)

Thanks!

pub_sub.py example doesn't seem to work

Hi there!

I can't get the pub_sub.py example to work. Here's the commands I'm running:

./pub_sub.py --endpoint tcp://hopscotch:5050 --mode=publisher
./pub_sub.py --endpoint tcp://hopscotch:5050 --mode=subscriber

The publisher appears to be publishing:
publishing '1430422027.98'
publishing '1430422028.98'
publishing '1430422029.98'
publishing '1430422030.98'

But, the subscriber doesn't seem to be picking up the messages. Any pointers on debugging this?

Change txZMQ (from GPL) to a more permissive license (at least LGPL)

Just want to confirm that you are intentionally prohibiting distribution of this package along with proprietary software. While GPL is certainly a valid license for a library, the LGPL is more common for libraries (e.g. psycopg2) -- and you (appear to) have licensed other packages under more permissive licenses like MPL and MIT.

highWaterMark cannot protect from memory overflow

With pyzmq, setting HWM would protect you from memory overflow issues like this:

import zmq
ctx = zmq.Context()
s= zmq.socket(zmq.PUSH)
s.setsockopt(zmq.HWM, 8)
s.connect('tcp://localhost:33710')  # non-exist endpoint
while True:
    s.send('XXX')  # this will block when high water mark is reached

With txZMQ, ZmqConnection.send() simply append into a deque(), regardless of highWaterMark. Therefore test like above would take up a lot of memory.

If send() could return a Deferred object, the calling code would be able to do something about it.

A limited work around is to set a maxlen on the deque object, so it would drop data when full. Problem is if you really want to wait until the exceptional HWM state is finished (data is very important for example), this won't help.

Latest version of pyzmq incompatible

The latest version of pyzmq (14.0) removes 'zmq.core'. Instead, most of the members can be accessed directly. For example, 'zmq.constants' rather than 'zmq.core.constants'. This breaks txZMQ.

Wrong FSF address in LICENSE.txt

Hi,

It seems you have an old copy of the GPL that gives an erroneous address for the Free Software Foundation. The current address is the one at the top of the file, but there is another address at the end of the file that isn't the same.

Dropped connection on XREQ

I have been using txZMQ with a Twisted server, and have seen sporadic dropped communications, where data is correctly written into the socket, but is never read out of it, nor is any subsequent data. I traced the problem back to the doRead function, which queries ZMQ for the socket events, then does all pending writes, followed by rcv. In the case where the communications get dropped, I am seeing that the initial call to getsockopts for the events returns POLLOUT but no POLLIN. If I call getsockopts a second time after the pending writes are done, then I do get a POLLIN. Adding this second call seems to have fixed the problem, possibly because of a delay between the Twisted polling loop and whatever ZMQ is doing in the background. Have you seen this problem before? Do you have a better solution?

Here is the doRead function as we have it:

    events = self.socket.getsockopt(constants.EVENTS)               -> returns POLLOUT only on the failure case
    if (events & constants.POLLOUT) == constants.POLLOUT:
        while self.queue:
            try:
                self.socket.send(
                    self.queue[0][1], constants.NOBLOCK | self.queue[0][0])
            except error.ZMQError as e:
                if e.errno == constants.EAGAIN:
                    break
                self.queue.popleft()
                raise e

            self.queue.popleft()
    # On rare occasions, when there data to write, this second call to get
    # events may return POLLIN where the first one did not. 
    events = self.socket.getsockopt(constants.EVENTS)           -> I added this, it returns POLLIN on the failure case
    if (events & constants.POLLIN) == constants.POLLIN:
        while True:
            if self.factory is None:  # disconnected
                return
            try:
                message = self._readMultipart()
            except error.ZMQError as e:
                if e.errno == constants.EAGAIN:
                    break

                raise e

            log.callWithLogger(self, self.messageReceived, message)

Thanks very much for taking a look at this!

txZMQ object responsibilities

In my last branch (#14), I kept running into conceptual issues inside txZMQ regarding what is named what, and what's responsible for what.

This is actually not surprising, given:

  1. the age of the project -- young! (and the code base is pretty sweet, even considering that)
  2. the fact that we're actually using zmq socket fd's and this changes so many of the semantics from the standard Twisted approach

Regardless, here are my observations:

  • ZmqEndpoint takes two parameters: a connection type ("bind" or "connect") and a ZeroMQ address
  • ZmqConnection does a connect or bind depending upon the connection type set in the passed ZmqEndpoint instance.
  • ZmqConnection subclasses should be able to have sensible defaults for connection type (that can be overridden if necessary), thus removing one more thing from the developers' task list when using the library
  • it seems that setting the connection type should happen with a connection-oriented object
  • it seems that ZmqEndpoint would be better suited just focusing on the address of an endpoint
  • ZmqConnection is responsible for both connecting and the work that, traditionally in twisted, is done by Protocols and Transports: namely, writing to the fd
  • both client and server code is set up in ZmqConnection; I know that things are somewhat different with ZMQ, but it will be helpful to devs to create some code isolation for the two approaches while at the same time leaving things flexible enough so devs can take full advantage of ZMQ with txZMQ

Proposed solutions (branches!):

  • for txZMQ twisted APIs, use Twisted terminology: rename ZmqEndpoint to ZmqAddress
  • split out connection and address configuration in ZmqEndpoint, putting connection info in ZmqConnection
  • split out the "transport" (writing/reading of fds) from the ZmqConnection object (not sure what exactly... need to experiment and see what fits best after tweaking the other bits)
  • split ZmqConnection into ZmqServerEndpoint and ZmqClientEndpoint

I think once these solutions are in place, the code will be very logically segmented and developers will have a very clear picture of how to use txZMQ in Twisted apps.

Thoughts?

Push your latest commits, @smira :)

@smira, I see that 0.6.2 is on PyPI, but I can't find a commit with that version number on the master branch here on github. My guess is that you have local commits?

Can you push them so I can poke around? :)

Thanks again!

pubsub (at least) doesn't work with python 3

When publishing with this code in python3, an unexpected and unsolvable error happens:

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

from twisted.application import service
from txzmq import ZmqFactory, ZmqEndpoint, ZmqPubConnection
from twisted.internet import reactor
import time

class ZmqPubSubService(service.Service):
    def __init__(self):
        self.zf = ZmqFactory()


    def publish(self):
        data = str(time.time()).encode('ascii')
        print("Publishing {}".format(data))
        self.subconn.publish(data, b'test')
        reactor.callLater(1, self.publish)


    def startService(self):
        self.sube = ZmqEndpoint('bind', 'tcp://127.0.0.1:5557')
        self.push = ZmqEndpoint('bind', 'tcp://127.0.0.1:5558')
        self.subconn = ZmqPubConnection(self.zf, self.sube)
        self.publish()


    def stopService(self):
        del self.sube
        del self.push
        del self.subconn
        #return self.tcpService.stopService()

this traceback happens:

2016-01-09 21:13:36+0100 [-] Publishing b'1452370416.7726436'
2016-01-09 21:13:36+0100 [-] Traceback (most recent call last):
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/bin/cond", line 9, in <module>
2016-01-09 21:13:36+0100 [-]     load_entry_point('con2==2.0', 'console_scripts', 'cond')()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/src/con2/scripts/cond_run.py", line 12, in main
2016-01-09 21:13:36+0100 [-]     run()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/scripts/twistd.py", line 29, in run
2016-01-09 21:13:36+0100 [-]     app.run(runApp, ServerOptions)
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/application/app.py", line 617, in run
2016-01-09 21:13:36+0100 [-]     runApp(config)
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/scripts/twistd.py", line 25, in runApp
2016-01-09 21:13:36+0100 [-]     _SomeApplicationRunner(config).run()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/application/app.py", line 352, in run
2016-01-09 21:13:36+0100 [-]     self.postApplication()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/scripts/_twistd_unix.py", line 207, in postApplication
2016-01-09 21:13:36+0100 [-]     self.startApplication(self.application)
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/scripts/_twistd_unix.py", line 403, in startApplication
2016-01-09 21:13:36+0100 [-]     app.startApplication(application, not self.config['no_save'])
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/application/app.py", line 633, in startApplication
2016-01-09 21:13:36+0100 [-]     service.IService(application).startService()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/Twisted-15.5.0-py3.4.egg/twisted/application/service.py", line 283, in startService
2016-01-09 21:13:36+0100 [-]     service.startService()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/src/con2/services/zmq_pubsub.py", line 27, in startService
2016-01-09 21:13:36+0100 [-]     self.publish()
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/src/con2/services/zmq_pubsub.py", line 19, in publish
2016-01-09 21:13:36+0100 [-]     self.subconn.publish(data, b'test')
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/txZMQ-0.7.4-py3.4.egg/txzmq/pubsub.py", line 26, in publish
2016-01-09 21:13:36+0100 [-]     self.send(tag + b'\0' + message)
2016-01-09 21:13:36+0100 [-]   File "/home/dk/develop/con2/env/lib/python3.4/site-packages/txZMQ-0.7.4-py3.4.egg/txzmq/connection.py", line 312, in send
2016-01-09 21:13:36+0100 [-]     self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
2016-01-09 21:13:36+0100 [-]   File "zmq/backend/cython/socket.pyx", line 617, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:6625)
2016-01-09 21:13:36+0100 [-]   File "zmq/backend/cython/socket.pyx", line 664, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:6363)
2016-01-09 21:13:36+0100 [-]   File "zmq/backend/cython/socket.pyx", line 184, in zmq.backend.cython.socket._send_copy (zmq/backend/cython/socket.c:2361)
2016-01-09 21:13:36+0100 [-]   File "zmq/utils/buffers.pxd", line 200, in buffers.asbuffer_r (zmq/backend/cython/socket.c:8652)
2016-01-09 21:13:36+0100 [-]   File "zmq/utils/buffers.pxd", line 151, in buffers.asbuffer (zmq/backend/cython/socket.c:7985)
2016-01-09 21:13:36+0100 [-] TypeError: 116 does not provide a buffer interface.

The reason is this code:

tyzmq/connection.py

    def send(self, message):
        """
        Send message via ZeroMQ socket.

        Sending is performed directly to ZeroMQ without queueing. If HWM is
        reached on ZeroMQ side, sending operation is aborted with exception
        from ZeroMQ (EAGAIN).

        After writing read is scheduled as ZeroMQ may not signal incoming
        messages after we touched socket with write request.

        :param message: message data, could be either list of str (multipart
            message) or just str
        :type message: str or list of str
        """
        if not hasattr(message, '__iter__'):
            self.socket.send(message, constants.NOBLOCK)
        else:
            for m in message[:-1]:
                self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
            self.socket.send(message[-1], constants.NOBLOCK)

In python3 bytes and str both have __iter__ methods, so this code enters the wrong path (the one where a multipart message should be sent).
The solution should be something like this, which also works in python2 where bytes==str:

        if type(message) is bytes:
            self.socket.send(message, constants.NOBLOCK)
        elif hasattr(message, '__iter__'):
            for m in message[:-1]:
                self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
            self.socket.send(message[-1], constants.NOBLOCK)

Maybe even an else with a raise could be preferred like this:

        else:
            raise Exception('Only an iterable of bytes or type bytes is allowed as argument.')

Online API reference is missing

I haven't found the docs online

I would also say, that docs, generated by epydoc are even less readable, than the sources.
Think about Sphinx + rtfd.

Thanks for useful project!

Does txZMQ support iocp reactor

Hi.

I noticed ZMQconnection need to call addReader method which is missing in IOCP reactor. I am curious whether txZMQ supports iocp reactor or not. Thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.